]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/test/test_storage.py
storage: add bucket-counting share crawler, add its output (number of files+directori...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / test / test_storage.py
1
2 import time, os.path, stat, re
3
4 from twisted.trial import unittest
5
6 from twisted.internet import defer
7 from twisted.application import service
8 from foolscap import eventual
9 import itertools
10 from allmydata import interfaces
11 from allmydata.util import fileutil, hashutil, base32, pollmixin
12 from allmydata.storage.server import StorageServer, storage_index_to_dir
13 from allmydata.storage.mutable import MutableShareFile
14 from allmydata.storage.immutable import BucketWriter, BucketReader
15 from allmydata.storage.common import DataTooLargeError
16 from allmydata.storage.lease import LeaseInfo
17 from allmydata.immutable.layout import WriteBucketProxy, WriteBucketProxy_v2, \
18      ReadBucketProxy
19 from allmydata.interfaces import BadWriteEnablerError
20 from allmydata.test.common import LoggingServiceParent
21 from allmydata.web.storage import StorageStatus, remove_prefix
22
23 class Marker:
24     pass
25 class FakeCanary:
26     def __init__(self, ignore_disconnectors=False):
27         self.ignore = ignore_disconnectors
28         self.disconnectors = {}
29     def notifyOnDisconnect(self, f, *args, **kwargs):
30         if self.ignore:
31             return
32         m = Marker()
33         self.disconnectors[m] = (f, args, kwargs)
34         return m
35     def dontNotifyOnDisconnect(self, marker):
36         if self.ignore:
37             return
38         del self.disconnectors[marker]
39
40 class FakeStatsProvider:
41     def count(self, name, delta=1):
42         pass
43     def register_producer(self, producer):
44         pass
45
46 class Bucket(unittest.TestCase):
47     def make_workdir(self, name):
48         basedir = os.path.join("storage", "Bucket", name)
49         incoming = os.path.join(basedir, "tmp", "bucket")
50         final = os.path.join(basedir, "bucket")
51         fileutil.make_dirs(basedir)
52         fileutil.make_dirs(os.path.join(basedir, "tmp"))
53         return incoming, final
54
55     def bucket_writer_closed(self, bw, consumed):
56         pass
57     def add_latency(self, category, latency):
58         pass
59     def count(self, name, delta=1):
60         pass
61
62     def make_lease(self):
63         owner_num = 0
64         renew_secret = os.urandom(32)
65         cancel_secret = os.urandom(32)
66         expiration_time = time.time() + 5000
67         return LeaseInfo(owner_num, renew_secret, cancel_secret,
68                          expiration_time, "\x00" * 20)
69
70     def test_create(self):
71         incoming, final = self.make_workdir("test_create")
72         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
73                           FakeCanary())
74         bw.remote_write(0, "a"*25)
75         bw.remote_write(25, "b"*25)
76         bw.remote_write(50, "c"*25)
77         bw.remote_write(75, "d"*7)
78         bw.remote_close()
79
80     def test_readwrite(self):
81         incoming, final = self.make_workdir("test_readwrite")
82         bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
83                           FakeCanary())
84         bw.remote_write(0, "a"*25)
85         bw.remote_write(25, "b"*25)
86         bw.remote_write(50, "c"*7) # last block may be short
87         bw.remote_close()
88
89         # now read from it
90         br = BucketReader(self, bw.finalhome)
91         self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
92         self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
93         self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
94
95 class RemoteBucket:
96
97     def callRemote(self, methname, *args, **kwargs):
98         def _call():
99             meth = getattr(self.target, "remote_" + methname)
100             return meth(*args, **kwargs)
101         return defer.maybeDeferred(_call)
102
103 class BucketProxy(unittest.TestCase):
104     def make_bucket(self, name, size):
105         basedir = os.path.join("storage", "BucketProxy", name)
106         incoming = os.path.join(basedir, "tmp", "bucket")
107         final = os.path.join(basedir, "bucket")
108         fileutil.make_dirs(basedir)
109         fileutil.make_dirs(os.path.join(basedir, "tmp"))
110         bw = BucketWriter(self, incoming, final, size, self.make_lease(),
111                           FakeCanary())
112         rb = RemoteBucket()
113         rb.target = bw
114         return bw, rb, final
115
116     def make_lease(self):
117         owner_num = 0
118         renew_secret = os.urandom(32)
119         cancel_secret = os.urandom(32)
120         expiration_time = time.time() + 5000
121         return LeaseInfo(owner_num, renew_secret, cancel_secret,
122                          expiration_time, "\x00" * 20)
123
124     def bucket_writer_closed(self, bw, consumed):
125         pass
126     def add_latency(self, category, latency):
127         pass
128     def count(self, name, delta=1):
129         pass
130
131     def test_create(self):
132         bw, rb, sharefname = self.make_bucket("test_create", 500)
133         bp = WriteBucketProxy(rb,
134                               data_size=300,
135                               block_size=10,
136                               num_segments=5,
137                               num_share_hashes=3,
138                               uri_extension_size_max=500, nodeid=None)
139         self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
140
141     def _do_test_readwrite(self, name, header_size, wbp_class, rbp_class):
142         # Let's pretend each share has 100 bytes of data, and that there are
143         # 4 segments (25 bytes each), and 8 shares total. So the two
144         # per-segment merkle trees (crypttext_hash_tree,
145         # block_hashes) will have 4 leaves and 7 nodes each. The per-share
146         # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
147         # nodes. Furthermore, let's assume the uri_extension is 500 bytes
148         # long. That should make the whole share:
149         #
150         # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
151         # 0x44 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1446 bytes long
152
153         sharesize = header_size + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500
154
155         crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
156                             for i in range(7)]
157         block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
158                         for i in range(7)]
159         share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
160                         for i in (1,9,13)]
161         uri_extension = "s" + "E"*498 + "e"
162
163         bw, rb, sharefname = self.make_bucket(name, sharesize)
164         bp = wbp_class(rb,
165                        data_size=95,
166                        block_size=25,
167                        num_segments=4,
168                        num_share_hashes=3,
169                        uri_extension_size_max=len(uri_extension),
170                        nodeid=None)
171
172         d = bp.put_header()
173         d.addCallback(lambda res: bp.put_block(0, "a"*25))
174         d.addCallback(lambda res: bp.put_block(1, "b"*25))
175         d.addCallback(lambda res: bp.put_block(2, "c"*25))
176         d.addCallback(lambda res: bp.put_block(3, "d"*20))
177         d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
178         d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
179         d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
180         d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
181         d.addCallback(lambda res: bp.close())
182
183         # now read everything back
184         def _start_reading(res):
185             br = BucketReader(self, sharefname)
186             rb = RemoteBucket()
187             rb.target = br
188             rbp = rbp_class(rb, peerid="abc", storage_index="")
189             self.failUnless("to peer" in repr(rbp))
190             self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
191
192             d1 = rbp.get_block_data(0, 25, 25)
193             d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
194             d1.addCallback(lambda res: rbp.get_block_data(1, 25, 25))
195             d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
196             d1.addCallback(lambda res: rbp.get_block_data(2, 25, 25))
197             d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
198             d1.addCallback(lambda res: rbp.get_block_data(3, 25, 20))
199             d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
200
201             d1.addCallback(lambda res: rbp.get_crypttext_hashes())
202             d1.addCallback(lambda res:
203                            self.failUnlessEqual(res, crypttext_hashes))
204             d1.addCallback(lambda res: rbp.get_block_hashes(set(range(4))))
205             d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
206             d1.addCallback(lambda res: rbp.get_share_hashes())
207             d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
208             d1.addCallback(lambda res: rbp.get_uri_extension())
209             d1.addCallback(lambda res:
210                            self.failUnlessEqual(res, uri_extension))
211
212             return d1
213
214         d.addCallback(_start_reading)
215
216         return d
217
218     def test_readwrite_v1(self):
219         return self._do_test_readwrite("test_readwrite_v1",
220                                        0x24, WriteBucketProxy, ReadBucketProxy)
221
222     def test_readwrite_v2(self):
223         return self._do_test_readwrite("test_readwrite_v2",
224                                        0x44, WriteBucketProxy_v2, ReadBucketProxy)
225
226 class FakeDiskStorageServer(StorageServer):
227     def stat_disk(self, d):
228         return self.DISKAVAIL
229
230 class Server(unittest.TestCase):
231
232     def setUp(self):
233         self.sparent = LoggingServiceParent()
234         self._lease_secret = itertools.count()
235     def tearDown(self):
236         return self.sparent.stopService()
237
238     def workdir(self, name):
239         basedir = os.path.join("storage", "Server", name)
240         return basedir
241
242     def create(self, name, reserved_space=0, klass=StorageServer):
243         workdir = self.workdir(name)
244         ss = klass(workdir, "\x00" * 20, reserved_space=reserved_space,
245                    stats_provider=FakeStatsProvider())
246         ss.setServiceParent(self.sparent)
247         return ss
248
249     def test_create(self):
250         ss = self.create("test_create")
251
252     def allocate(self, ss, storage_index, sharenums, size, canary=None):
253         renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
254         cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
255         if not canary:
256             canary = FakeCanary()
257         return ss.remote_allocate_buckets(storage_index,
258                                           renew_secret, cancel_secret,
259                                           sharenums, size, canary)
260
261     def test_large_share(self):
262         ss = self.create("test_large_share")
263
264         already,writers = self.allocate(ss, "allocate", [0], 2**32+2)
265         self.failUnlessEqual(already, set())
266         self.failUnlessEqual(set(writers.keys()), set([0]))
267
268         shnum, bucket = writers.items()[0]
269         # This test is going to hammer your filesystem if it doesn't make a sparse file for this.  :-(
270         bucket.remote_write(2**32, "ab")
271         bucket.remote_close()
272
273         readers = ss.remote_get_buckets("allocate")
274         reader = readers[shnum]
275         self.failUnlessEqual(reader.remote_read(2**32, 2), "ab")
276     test_large_share.skip = "This test can spuriously fail if you have less than 4 GiB free on your filesystem, and if your filesystem doesn't support efficient sparse files then it is very expensive (Mac OS X is the only system I know of in the desktop/server area that doesn't support efficient sparse files)."
277
278     def test_dont_overfill_dirs(self):
279         """
280         This test asserts that if you add a second share whose storage index
281         share lots of leading bits with an extant share (but isn't the exact
282         same storage index), this won't add an entry to the share directory.
283         """
284         ss = self.create("test_dont_overfill_dirs")
285         already, writers = self.allocate(ss, "storageindex", [0], 10)
286         for i, wb in writers.items():
287             wb.remote_write(0, "%10d" % i)
288             wb.remote_close()
289         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
290                                 "shares")
291         children_of_storedir = set(os.listdir(storedir))
292
293         # Now store another one under another storageindex that has leading
294         # chars the same as the first storageindex.
295         already, writers = self.allocate(ss, "storageindey", [0], 10)
296         for i, wb in writers.items():
297             wb.remote_write(0, "%10d" % i)
298             wb.remote_close()
299         storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
300                                 "shares")
301         new_children_of_storedir = set(os.listdir(storedir))
302         self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
303
304     def test_remove_incoming(self):
305         ss = self.create("test_remove_incoming")
306         already, writers = self.allocate(ss, "vid", range(3), 10)
307         for i,wb in writers.items():
308             wb.remote_write(0, "%10d" % i)
309             wb.remote_close()
310         incoming_share_dir = wb.incominghome
311         incoming_bucket_dir = os.path.dirname(incoming_share_dir)
312         incoming_prefix_dir = os.path.dirname(incoming_bucket_dir)
313         incoming_dir = os.path.dirname(incoming_prefix_dir)
314         self.failIf(os.path.exists(incoming_bucket_dir))
315         self.failIf(os.path.exists(incoming_prefix_dir))
316         self.failUnless(os.path.exists(incoming_dir))
317
318     def test_allocate(self):
319         ss = self.create("test_allocate")
320
321         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
322
323         canary = FakeCanary()
324         already,writers = self.allocate(ss, "allocate", [0,1,2], 75)
325         self.failUnlessEqual(already, set())
326         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
327
328         # while the buckets are open, they should not count as readable
329         self.failUnlessEqual(ss.remote_get_buckets("allocate"), {})
330
331         # close the buckets
332         for i,wb in writers.items():
333             wb.remote_write(0, "%25d" % i)
334             wb.remote_close()
335             # aborting a bucket that was already closed is a no-op
336             wb.remote_abort()
337
338         # now they should be readable
339         b = ss.remote_get_buckets("allocate")
340         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
341         self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
342
343         # now if we ask about writing again, the server should offer those
344         # three buckets as already present. It should offer them even if we
345         # don't ask about those specific ones.
346         already,writers = self.allocate(ss, "allocate", [2,3,4], 75)
347         self.failUnlessEqual(already, set([0,1,2]))
348         self.failUnlessEqual(set(writers.keys()), set([3,4]))
349
350         # while those two buckets are open for writing, the server should
351         # refuse to offer them to uploaders
352
353         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
354         self.failUnlessEqual(already2, set([0,1,2]))
355         self.failUnlessEqual(set(writers2.keys()), set([5]))
356
357         # aborting the writes should remove the tempfiles
358         for i,wb in writers2.items():
359             wb.remote_abort()
360         already2,writers2 = self.allocate(ss, "allocate", [2,3,4,5], 75)
361         self.failUnlessEqual(already2, set([0,1,2]))
362         self.failUnlessEqual(set(writers2.keys()), set([5]))
363
364         for i,wb in writers2.items():
365             wb.remote_abort()
366         for i,wb in writers.items():
367             wb.remote_abort()
368
369     def test_disconnect(self):
370         # simulate a disconnection
371         ss = self.create("test_disconnect")
372         canary = FakeCanary()
373         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75, canary)
374         self.failUnlessEqual(already, set())
375         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
376         for (f,args,kwargs) in canary.disconnectors.values():
377             f(*args, **kwargs)
378         del already
379         del writers
380
381         # that ought to delete the incoming shares
382         already,writers = self.allocate(ss, "disconnect", [0,1,2], 75)
383         self.failUnlessEqual(already, set())
384         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
385
386     def test_reserved_space(self):
387         ss = self.create("test_reserved_space", reserved_space=10000,
388                          klass=FakeDiskStorageServer)
389         # the FakeDiskStorageServer doesn't do real statvfs() calls
390         ss.DISKAVAIL = 15000
391         # 15k available, 10k reserved, leaves 5k for shares
392
393         # a newly created and filled share incurs this much overhead, beyond
394         # the size we request.
395         OVERHEAD = 3*4
396         LEASE_SIZE = 4+32+32+4
397         canary = FakeCanary(True)
398         already,writers = self.allocate(ss, "vid1", [0,1,2], 1000, canary)
399         self.failUnlessEqual(len(writers), 3)
400         # now the StorageServer should have 3000 bytes provisionally
401         # allocated, allowing only 2000 more to be claimed
402         self.failUnlessEqual(len(ss._active_writers), 3)
403
404         # allocating 1001-byte shares only leaves room for one
405         already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001, canary)
406         self.failUnlessEqual(len(writers2), 1)
407         self.failUnlessEqual(len(ss._active_writers), 4)
408
409         # we abandon the first set, so their provisional allocation should be
410         # returned
411         del already
412         del writers
413         self.failUnlessEqual(len(ss._active_writers), 1)
414         # now we have a provisional allocation of 1001 bytes
415
416         # and we close the second set, so their provisional allocation should
417         # become real, long-term allocation, and grows to include the
418         # overhead.
419         for bw in writers2.values():
420             bw.remote_write(0, "a"*25)
421             bw.remote_close()
422         del already2
423         del writers2
424         del bw
425         self.failUnlessEqual(len(ss._active_writers), 0)
426
427         allocated = 1001 + OVERHEAD + LEASE_SIZE
428
429         # we have to manually increase DISKAVAIL, since we're not doing real
430         # disk measurements
431         ss.DISKAVAIL -= allocated
432
433         # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
434         # 5000-1085=3915 free, therefore we can fit 39 100byte shares
435         already3,writers3 = self.allocate(ss,"vid3", range(100), 100, canary)
436         self.failUnlessEqual(len(writers3), 39)
437         self.failUnlessEqual(len(ss._active_writers), 39)
438
439         del already3
440         del writers3
441         self.failUnlessEqual(len(ss._active_writers), 0)
442         ss.disownServiceParent()
443         del ss
444
445     def test_seek(self):
446         basedir = self.workdir("test_seek_behavior")
447         fileutil.make_dirs(basedir)
448         filename = os.path.join(basedir, "testfile")
449         f = open(filename, "wb")
450         f.write("start")
451         f.close()
452         # mode="w" allows seeking-to-create-holes, but truncates pre-existing
453         # files. mode="a" preserves previous contents but does not allow
454         # seeking-to-create-holes. mode="r+" allows both.
455         f = open(filename, "rb+")
456         f.seek(100)
457         f.write("100")
458         f.close()
459         filelen = os.stat(filename)[stat.ST_SIZE]
460         self.failUnlessEqual(filelen, 100+3)
461         f2 = open(filename, "rb")
462         self.failUnlessEqual(f2.read(5), "start")
463
464
465     def test_leases(self):
466         ss = self.create("test_leases")
467         canary = FakeCanary()
468         sharenums = range(5)
469         size = 100
470
471         rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
472                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
473         already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
474                                                      sharenums, size, canary)
475         self.failUnlessEqual(len(already), 0)
476         self.failUnlessEqual(len(writers), 5)
477         for wb in writers.values():
478             wb.remote_close()
479
480         leases = list(ss.get_leases("si0"))
481         self.failUnlessEqual(len(leases), 1)
482         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs0]))
483
484         rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
485                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
486         already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
487                                                      sharenums, size, canary)
488         for wb in writers.values():
489             wb.remote_close()
490
491         # take out a second lease on si1
492         rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
493                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
494         already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
495                                                      sharenums, size, canary)
496         self.failUnlessEqual(len(already), 5)
497         self.failUnlessEqual(len(writers), 0)
498
499         leases = list(ss.get_leases("si1"))
500         self.failUnlessEqual(len(leases), 2)
501         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2]))
502
503         # and a third lease, using add-lease
504         rs2a,cs2a = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
505                      hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
506         ss.remote_add_lease("si1", rs2a, cs2a)
507         leases = list(ss.get_leases("si1"))
508         self.failUnlessEqual(len(leases), 3)
509         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs1, rs2, rs2a]))
510
511         # add-lease on a missing storage index is silently ignored
512         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
513
514         # check that si0 is readable
515         readers = ss.remote_get_buckets("si0")
516         self.failUnlessEqual(len(readers), 5)
517
518         # renew the first lease. Only the proper renew_secret should work
519         ss.remote_renew_lease("si0", rs0)
520         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
521         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
522
523         # check that si0 is still readable
524         readers = ss.remote_get_buckets("si0")
525         self.failUnlessEqual(len(readers), 5)
526
527         # now cancel it
528         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
529         self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
530         ss.remote_cancel_lease("si0", cs0)
531
532         # si0 should now be gone
533         readers = ss.remote_get_buckets("si0")
534         self.failUnlessEqual(len(readers), 0)
535         # and the renew should no longer work
536         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
537
538
539         # cancel the first lease on si1, leaving the second and third in place
540         ss.remote_cancel_lease("si1", cs1)
541         readers = ss.remote_get_buckets("si1")
542         self.failUnlessEqual(len(readers), 5)
543         # the corresponding renew should no longer work
544         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
545
546         leases = list(ss.get_leases("si1"))
547         self.failUnlessEqual(len(leases), 2)
548         self.failUnlessEqual(set([l.renew_secret for l in leases]), set([rs2, rs2a]))
549
550         ss.remote_renew_lease("si1", rs2)
551         # cancelling the second and third should make it go away
552         ss.remote_cancel_lease("si1", cs2)
553         ss.remote_cancel_lease("si1", cs2a)
554         readers = ss.remote_get_buckets("si1")
555         self.failUnlessEqual(len(readers), 0)
556         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
557         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
558         self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2a)
559
560         leases = list(ss.get_leases("si1"))
561         self.failUnlessEqual(len(leases), 0)
562
563
564         # test overlapping uploads
565         rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
566                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
567         rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
568                    hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
569         already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
570                                                      sharenums, size, canary)
571         self.failUnlessEqual(len(already), 0)
572         self.failUnlessEqual(len(writers), 5)
573         already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
574                                                        sharenums, size, canary)
575         self.failUnlessEqual(len(already2), 0)
576         self.failUnlessEqual(len(writers2), 0)
577         for wb in writers.values():
578             wb.remote_close()
579
580         leases = list(ss.get_leases("si3"))
581         self.failUnlessEqual(len(leases), 1)
582
583         already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
584                                                        sharenums, size, canary)
585         self.failUnlessEqual(len(already3), 5)
586         self.failUnlessEqual(len(writers3), 0)
587
588         leases = list(ss.get_leases("si3"))
589         self.failUnlessEqual(len(leases), 2)
590
591     def test_readonly(self):
592         workdir = self.workdir("test_readonly")
593         ss = StorageServer(workdir, "\x00" * 20, readonly_storage=True)
594         ss.setServiceParent(self.sparent)
595
596         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
597         self.failUnlessEqual(already, set())
598         self.failUnlessEqual(writers, {})
599
600         stats = ss.get_stats()
601         self.failUnlessEqual(stats["storage_server.accepting_immutable_shares"],
602                              False)
603         if "storage_server.disk_avail" in stats:
604             # windows does not have os.statvfs, so it doesn't give us disk
605             # stats. But if there are stats, readonly_storage means
606             # disk_avail=0
607             self.failUnlessEqual(stats["storage_server.disk_avail"], 0)
608
609     def test_discard(self):
610         # discard is really only used for other tests, but we test it anyways
611         workdir = self.workdir("test_discard")
612         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
613         ss.setServiceParent(self.sparent)
614
615         canary = FakeCanary()
616         already,writers = self.allocate(ss, "vid", [0,1,2], 75)
617         self.failUnlessEqual(already, set())
618         self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
619         for i,wb in writers.items():
620             wb.remote_write(0, "%25d" % i)
621             wb.remote_close()
622         # since we discard the data, the shares should be present but sparse.
623         # Since we write with some seeks, the data we read back will be all
624         # zeros.
625         b = ss.remote_get_buckets("vid")
626         self.failUnlessEqual(set(b.keys()), set([0,1,2]))
627         self.failUnlessEqual(b[0].remote_read(0, 25), "\x00" * 25)
628
629     def test_advise_corruption(self):
630         workdir = self.workdir("test_advise_corruption")
631         ss = StorageServer(workdir, "\x00" * 20, discard_storage=True)
632         ss.setServiceParent(self.sparent)
633
634         si0_s = base32.b2a("si0")
635         ss.remote_advise_corrupt_share("immutable", "si0", 0,
636                                        "This share smells funny.\n")
637         reportdir = os.path.join(workdir, "corruption-advisories")
638         reports = os.listdir(reportdir)
639         self.failUnlessEqual(len(reports), 1)
640         report_si0 = reports[0]
641         self.failUnless(si0_s in report_si0, report_si0)
642         f = open(os.path.join(reportdir, report_si0), "r")
643         report = f.read()
644         f.close()
645         self.failUnless("type: immutable" in report)
646         self.failUnless(("storage_index: %s" % si0_s) in report)
647         self.failUnless("share_number: 0" in report)
648         self.failUnless("This share smells funny." in report)
649
650         # test the RIBucketWriter version too
651         si1_s = base32.b2a("si1")
652         already,writers = self.allocate(ss, "si1", [1], 75)
653         self.failUnlessEqual(already, set())
654         self.failUnlessEqual(set(writers.keys()), set([1]))
655         writers[1].remote_write(0, "data")
656         writers[1].remote_close()
657
658         b = ss.remote_get_buckets("si1")
659         self.failUnlessEqual(set(b.keys()), set([1]))
660         b[1].remote_advise_corrupt_share("This share tastes like dust.\n")
661
662         reports = os.listdir(reportdir)
663         self.failUnlessEqual(len(reports), 2)
664         report_si1 = [r for r in reports if si1_s in r][0]
665         f = open(os.path.join(reportdir, report_si1), "r")
666         report = f.read()
667         f.close()
668         self.failUnless("type: immutable" in report)
669         self.failUnless(("storage_index: %s" % si1_s) in report)
670         self.failUnless("share_number: 1" in report)
671         self.failUnless("This share tastes like dust." in report)
672
673
674
675 class MutableServer(unittest.TestCase):
676
677     def setUp(self):
678         self.sparent = LoggingServiceParent()
679         self._lease_secret = itertools.count()
680     def tearDown(self):
681         return self.sparent.stopService()
682
683     def workdir(self, name):
684         basedir = os.path.join("storage", "MutableServer", name)
685         return basedir
686
687     def create(self, name):
688         workdir = self.workdir(name)
689         ss = StorageServer(workdir, "\x00" * 20)
690         ss.setServiceParent(self.sparent)
691         return ss
692
693     def test_create(self):
694         ss = self.create("test_create")
695
696     def write_enabler(self, we_tag):
697         return hashutil.tagged_hash("we_blah", we_tag)
698
699     def renew_secret(self, tag):
700         return hashutil.tagged_hash("renew_blah", str(tag))
701
702     def cancel_secret(self, tag):
703         return hashutil.tagged_hash("cancel_blah", str(tag))
704
705     def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
706         write_enabler = self.write_enabler(we_tag)
707         renew_secret = self.renew_secret(lease_tag)
708         cancel_secret = self.cancel_secret(lease_tag)
709         rstaraw = ss.remote_slot_testv_and_readv_and_writev
710         testandwritev = dict( [ (shnum, ([], [], None) )
711                          for shnum in sharenums ] )
712         readv = []
713         rc = rstaraw(storage_index,
714                      (write_enabler, renew_secret, cancel_secret),
715                      testandwritev,
716                      readv)
717         (did_write, readv_data) = rc
718         self.failUnless(did_write)
719         self.failUnless(isinstance(readv_data, dict))
720         self.failUnlessEqual(len(readv_data), 0)
721
722     def test_container_size(self):
723         ss = self.create("test_container_size")
724         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
725                       set([0,1,2]), 100)
726         rstaraw = ss.remote_slot_testv_and_readv_and_writev
727         secrets = ( self.write_enabler("we1"),
728                     self.renew_secret("we1"),
729                     self.cancel_secret("we1") )
730         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
731         answer = rstaraw("si1", secrets,
732                          {0: ([], [(0,data)], len(data)+12)},
733                          [])
734         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
735
736         # trying to make the container too large will raise an exception
737         TOOBIG = MutableShareFile.MAX_SIZE + 10
738         self.failUnlessRaises(DataTooLargeError,
739                               rstaraw, "si1", secrets,
740                               {0: ([], [(0,data)], TOOBIG)},
741                               [])
742
743         # it should be possible to make the container smaller, although at
744         # the moment this doesn't actually affect the share
745         answer = rstaraw("si1", secrets,
746                          {0: ([], [(0,data)], len(data)+8)},
747                          [])
748         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
749
750     def test_allocate(self):
751         ss = self.create("test_allocate")
752         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
753                       set([0,1,2]), 100)
754
755         read = ss.remote_slot_readv
756         self.failUnlessEqual(read("si1", [0], [(0, 10)]),
757                              {0: [""]})
758         self.failUnlessEqual(read("si1", [], [(0, 10)]),
759                              {0: [""], 1: [""], 2: [""]})
760         self.failUnlessEqual(read("si1", [0], [(100, 10)]),
761                              {0: [""]})
762
763         # try writing to one
764         secrets = ( self.write_enabler("we1"),
765                     self.renew_secret("we1"),
766                     self.cancel_secret("we1") )
767         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
768         write = ss.remote_slot_testv_and_readv_and_writev
769         answer = write("si1", secrets,
770                        {0: ([], [(0,data)], None)},
771                        [])
772         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
773
774         self.failUnlessEqual(read("si1", [0], [(0,20)]),
775                              {0: ["00000000001111111111"]})
776         self.failUnlessEqual(read("si1", [0], [(95,10)]),
777                              {0: ["99999"]})
778         #self.failUnlessEqual(s0.remote_get_length(), 100)
779
780         bad_secrets = ("bad write enabler", secrets[1], secrets[2])
781         f = self.failUnlessRaises(BadWriteEnablerError,
782                                   write, "si1", bad_secrets,
783                                   {}, [])
784         self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
785
786         # this testv should fail
787         answer = write("si1", secrets,
788                        {0: ([(0, 12, "eq", "444444444444"),
789                              (20, 5, "eq", "22222"),
790                              ],
791                             [(0, "x"*100)],
792                             None),
793                         },
794                        [(0,12), (20,5)],
795                        )
796         self.failUnlessEqual(answer, (False,
797                                       {0: ["000000000011", "22222"],
798                                        1: ["", ""],
799                                        2: ["", ""],
800                                        }))
801         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
802
803         # as should this one
804         answer = write("si1", secrets,
805                        {0: ([(10, 5, "lt", "11111"),
806                              ],
807                             [(0, "x"*100)],
808                             None),
809                         },
810                        [(10,5)],
811                        )
812         self.failUnlessEqual(answer, (False,
813                                       {0: ["11111"],
814                                        1: [""],
815                                        2: [""]},
816                                       ))
817         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
818
819
820     def test_operators(self):
821         # test operators, the data we're comparing is '11111' in all cases.
822         # test both fail+pass, reset data after each one.
823         ss = self.create("test_operators")
824
825         secrets = ( self.write_enabler("we1"),
826                     self.renew_secret("we1"),
827                     self.cancel_secret("we1") )
828         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
829         write = ss.remote_slot_testv_and_readv_and_writev
830         read = ss.remote_slot_readv
831
832         def reset():
833             write("si1", secrets,
834                   {0: ([], [(0,data)], None)},
835                   [])
836
837         reset()
838
839         #  lt
840         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
841                                              ],
842                                             [(0, "x"*100)],
843                                             None,
844                                             )}, [(10,5)])
845         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
846         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
847         self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
848         reset()
849
850         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
851                                              ],
852                                             [(0, "x"*100)],
853                                             None,
854                                             )}, [(10,5)])
855         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
856         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
857         reset()
858
859         answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
860                                              ],
861                                             [(0, "y"*100)],
862                                             None,
863                                             )}, [(10,5)])
864         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
865         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
866         reset()
867
868         #  le
869         answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
870                                              ],
871                                             [(0, "x"*100)],
872                                             None,
873                                             )}, [(10,5)])
874         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
875         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
876         reset()
877
878         answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
879                                              ],
880                                             [(0, "y"*100)],
881                                             None,
882                                             )}, [(10,5)])
883         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
884         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
885         reset()
886
887         answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
888                                              ],
889                                             [(0, "y"*100)],
890                                             None,
891                                             )}, [(10,5)])
892         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
893         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
894         reset()
895
896         #  eq
897         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
898                                              ],
899                                             [(0, "x"*100)],
900                                             None,
901                                             )}, [(10,5)])
902         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
903         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
904         reset()
905
906         answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
907                                              ],
908                                             [(0, "y"*100)],
909                                             None,
910                                             )}, [(10,5)])
911         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
912         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
913         reset()
914
915         #  ne
916         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
917                                              ],
918                                             [(0, "x"*100)],
919                                             None,
920                                             )}, [(10,5)])
921         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
922         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
923         reset()
924
925         answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
926                                              ],
927                                             [(0, "y"*100)],
928                                             None,
929                                             )}, [(10,5)])
930         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
931         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
932         reset()
933
934         #  ge
935         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
936                                              ],
937                                             [(0, "y"*100)],
938                                             None,
939                                             )}, [(10,5)])
940         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
941         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
942         reset()
943
944         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
945                                              ],
946                                             [(0, "y"*100)],
947                                             None,
948                                             )}, [(10,5)])
949         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
950         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
951         reset()
952
953         answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
954                                              ],
955                                             [(0, "y"*100)],
956                                             None,
957                                             )}, [(10,5)])
958         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
959         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
960         reset()
961
962         #  gt
963         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
964                                              ],
965                                             [(0, "y"*100)],
966                                             None,
967                                             )}, [(10,5)])
968         self.failUnlessEqual(answer, (True, {0: ["11111"]}))
969         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
970         reset()
971
972         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
973                                              ],
974                                             [(0, "x"*100)],
975                                             None,
976                                             )}, [(10,5)])
977         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
978         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
979         reset()
980
981         answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
982                                              ],
983                                             [(0, "x"*100)],
984                                             None,
985                                             )}, [(10,5)])
986         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
987         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
988         reset()
989
990         # finally, test some operators against empty shares
991         answer = write("si1", secrets, {1: ([(10, 5, "eq", "11112"),
992                                              ],
993                                             [(0, "x"*100)],
994                                             None,
995                                             )}, [(10,5)])
996         self.failUnlessEqual(answer, (False, {0: ["11111"]}))
997         self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
998         reset()
999
1000     def test_readv(self):
1001         ss = self.create("test_readv")
1002         secrets = ( self.write_enabler("we1"),
1003                     self.renew_secret("we1"),
1004                     self.cancel_secret("we1") )
1005         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1006         write = ss.remote_slot_testv_and_readv_and_writev
1007         read = ss.remote_slot_readv
1008         data = [("%d" % i) * 100 for i in range(3)]
1009         rc = write("si1", secrets,
1010                    {0: ([], [(0,data[0])], None),
1011                     1: ([], [(0,data[1])], None),
1012                     2: ([], [(0,data[2])], None),
1013                     }, [])
1014         self.failUnlessEqual(rc, (True, {}))
1015
1016         answer = read("si1", [], [(0, 10)])
1017         self.failUnlessEqual(answer, {0: ["0"*10],
1018                                       1: ["1"*10],
1019                                       2: ["2"*10]})
1020
1021     def compare_leases_without_timestamps(self, leases_a, leases_b):
1022         self.failUnlessEqual(len(leases_a), len(leases_b))
1023         for i in range(len(leases_a)):
1024             num_a, a = leases_a[i]
1025             num_b, b = leases_b[i]
1026             self.failUnlessEqual(num_a, num_b)
1027             self.failUnlessEqual(a.owner_num,       b.owner_num)
1028             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1029             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1030             self.failUnlessEqual(a.nodeid,          b.nodeid)
1031
1032     def compare_leases(self, leases_a, leases_b):
1033         self.failUnlessEqual(len(leases_a), len(leases_b))
1034         for i in range(len(leases_a)):
1035             num_a, a = leases_a[i]
1036             num_b, b = leases_b[i]
1037             self.failUnlessEqual(num_a, num_b)
1038             self.failUnlessEqual(a.owner_num,       b.owner_num)
1039             self.failUnlessEqual(a.renew_secret,    b.renew_secret)
1040             self.failUnlessEqual(a.cancel_secret,   b.cancel_secret)
1041             self.failUnlessEqual(a.nodeid,          b.nodeid)
1042             self.failUnlessEqual(a.expiration_time, b.expiration_time)
1043
1044     def test_leases(self):
1045         ss = self.create("test_leases")
1046         def secrets(n):
1047             return ( self.write_enabler("we1"),
1048                      self.renew_secret("we1-%d" % n),
1049                      self.cancel_secret("we1-%d" % n) )
1050         data = "".join([ ("%d" % i) * 10 for i in range(10) ])
1051         write = ss.remote_slot_testv_and_readv_and_writev
1052         read = ss.remote_slot_readv
1053         rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1054         self.failUnlessEqual(rc, (True, {}))
1055
1056         # create a random non-numeric file in the bucket directory, to
1057         # exercise the code that's supposed to ignore those.
1058         bucket_dir = os.path.join(self.workdir("test_leases"),
1059                                   "shares", storage_index_to_dir("si1"))
1060         f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
1061         f.write("you ought to be ignoring me\n")
1062         f.close()
1063
1064         s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
1065         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1066
1067         # add-lease on a missing storage index is silently ignored
1068         self.failUnlessEqual(ss.remote_add_lease("si18", "", ""), None)
1069
1070         # re-allocate the slots and use the same secrets, that should update
1071         # the lease
1072         write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
1073         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1074
1075         # renew it directly
1076         ss.remote_renew_lease("si1", secrets(0)[1])
1077         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1078
1079         # now allocate them with a bunch of different secrets, to trigger the
1080         # extended lease code. Use add_lease for one of them.
1081         write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
1082         self.failUnlessEqual(len(s0.debug_get_leases()), 2)
1083         secrets2 = secrets(2)
1084         ss.remote_add_lease("si1", secrets2[1], secrets2[2])
1085         self.failUnlessEqual(len(s0.debug_get_leases()), 3)
1086         write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
1087         write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
1088         write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
1089
1090         self.failUnlessEqual(len(s0.debug_get_leases()), 6)
1091
1092         # cancel one of them
1093         ss.remote_cancel_lease("si1", secrets(5)[2])
1094         self.failUnlessEqual(len(s0.debug_get_leases()), 5)
1095
1096         all_leases = s0.debug_get_leases()
1097         # and write enough data to expand the container, forcing the server
1098         # to move the leases
1099         write("si1", secrets(0),
1100               {0: ([], [(0,data)], 200), },
1101               [])
1102
1103         # read back the leases, make sure they're still intact.
1104         self.compare_leases_without_timestamps(all_leases,
1105                                                s0.debug_get_leases())
1106
1107         ss.remote_renew_lease("si1", secrets(0)[1])
1108         ss.remote_renew_lease("si1", secrets(1)[1])
1109         ss.remote_renew_lease("si1", secrets(2)[1])
1110         ss.remote_renew_lease("si1", secrets(3)[1])
1111         ss.remote_renew_lease("si1", secrets(4)[1])
1112         self.compare_leases_without_timestamps(all_leases,
1113                                                s0.debug_get_leases())
1114         # get a new copy of the leases, with the current timestamps. Reading
1115         # data and failing to renew/cancel leases should leave the timestamps
1116         # alone.
1117         all_leases = s0.debug_get_leases()
1118         # renewing with a bogus token should prompt an error message
1119
1120         # examine the exception thus raised, make sure the old nodeid is
1121         # present, to provide for share migration
1122         e = self.failUnlessRaises(IndexError,
1123                                   ss.remote_renew_lease, "si1",
1124                                   secrets(20)[1])
1125         e_s = str(e)
1126         self.failUnless("Unable to renew non-existent lease" in e_s)
1127         self.failUnless("I have leases accepted by nodeids:" in e_s)
1128         self.failUnless("nodeids: 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' ." in e_s)
1129
1130         # same for cancelling
1131         self.failUnlessRaises(IndexError,
1132                               ss.remote_cancel_lease, "si1",
1133                               secrets(20)[2])
1134         self.compare_leases(all_leases, s0.debug_get_leases())
1135
1136         # reading shares should not modify the timestamp
1137         read("si1", [], [(0,200)])
1138         self.compare_leases(all_leases, s0.debug_get_leases())
1139
1140         write("si1", secrets(0),
1141               {0: ([], [(200, "make me bigger")], None)}, [])
1142         self.compare_leases_without_timestamps(all_leases,
1143                                                s0.debug_get_leases())
1144
1145         write("si1", secrets(0),
1146               {0: ([], [(500, "make me really bigger")], None)}, [])
1147         self.compare_leases_without_timestamps(all_leases,
1148                                                s0.debug_get_leases())
1149
1150         # now cancel them all
1151         ss.remote_cancel_lease("si1", secrets(0)[2])
1152         ss.remote_cancel_lease("si1", secrets(1)[2])
1153         ss.remote_cancel_lease("si1", secrets(2)[2])
1154         ss.remote_cancel_lease("si1", secrets(3)[2])
1155
1156         # the slot should still be there
1157         remaining_shares = read("si1", [], [(0,10)])
1158         self.failUnlessEqual(len(remaining_shares), 1)
1159         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1160
1161         # cancelling a non-existent lease should raise an IndexError
1162         self.failUnlessRaises(IndexError,
1163                               ss.remote_cancel_lease, "si1", "nonsecret")
1164
1165         # and the slot should still be there
1166         remaining_shares = read("si1", [], [(0,10)])
1167         self.failUnlessEqual(len(remaining_shares), 1)
1168         self.failUnlessEqual(len(s0.debug_get_leases()), 1)
1169
1170         ss.remote_cancel_lease("si1", secrets(4)[2])
1171         # now the slot should be gone
1172         no_shares = read("si1", [], [(0,10)])
1173         self.failUnlessEqual(no_shares, {})
1174
1175         # cancelling a lease on a non-existent share should raise an IndexError
1176         self.failUnlessRaises(IndexError,
1177                               ss.remote_cancel_lease, "si2", "nonsecret")
1178
1179     def test_remove(self):
1180         ss = self.create("test_remove")
1181         self.allocate(ss, "si1", "we1", self._lease_secret.next(),
1182                       set([0,1,2]), 100)
1183         readv = ss.remote_slot_readv
1184         writev = ss.remote_slot_testv_and_readv_and_writev
1185         secrets = ( self.write_enabler("we1"),
1186                     self.renew_secret("we1"),
1187                     self.cancel_secret("we1") )
1188         # delete sh0 by setting its size to zero
1189         answer = writev("si1", secrets,
1190                         {0: ([], [], 0)},
1191                         [])
1192         # the answer should mention all the shares that existed before the
1193         # write
1194         self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
1195         # but a new read should show only sh1 and sh2
1196         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1197                              {1: [""], 2: [""]})
1198
1199         # delete sh1 by setting its size to zero
1200         answer = writev("si1", secrets,
1201                         {1: ([], [], 0)},
1202                         [])
1203         self.failUnlessEqual(answer, (True, {1:[],2:[]}) )
1204         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1205                              {2: [""]})
1206
1207         # delete sh2 by setting its size to zero
1208         answer = writev("si1", secrets,
1209                         {2: ([], [], 0)},
1210                         [])
1211         self.failUnlessEqual(answer, (True, {2:[]}) )
1212         self.failUnlessEqual(readv("si1", [], [(0,10)]),
1213                              {})
1214         # and the bucket directory should now be gone
1215         si = base32.b2a("si1")
1216         # note: this is a detail of the storage server implementation, and
1217         # may change in the future
1218         prefix = si[:2]
1219         prefixdir = os.path.join(self.workdir("test_remove"), "shares", prefix)
1220         bucketdir = os.path.join(prefixdir, si)
1221         self.failUnless(os.path.exists(prefixdir))
1222         self.failIf(os.path.exists(bucketdir))
1223
1224 class Stats(unittest.TestCase):
1225
1226     def setUp(self):
1227         self.sparent = LoggingServiceParent()
1228         self._lease_secret = itertools.count()
1229     def tearDown(self):
1230         return self.sparent.stopService()
1231
1232     def workdir(self, name):
1233         basedir = os.path.join("storage", "Server", name)
1234         return basedir
1235
1236     def create(self, name):
1237         workdir = self.workdir(name)
1238         ss = StorageServer(workdir, "\x00" * 20)
1239         ss.setServiceParent(self.sparent)
1240         return ss
1241
1242     def test_latencies(self):
1243         ss = self.create("test_latencies")
1244         for i in range(10000):
1245             ss.add_latency("allocate", 1.0 * i)
1246         for i in range(1000):
1247             ss.add_latency("renew", 1.0 * i)
1248         for i in range(10):
1249             ss.add_latency("cancel", 2.0 * i)
1250         ss.add_latency("get", 5.0)
1251
1252         output = ss.get_latencies()
1253
1254         self.failUnlessEqual(sorted(output.keys()),
1255                              sorted(["allocate", "renew", "cancel", "get"]))
1256         self.failUnlessEqual(len(ss.latencies["allocate"]), 1000)
1257         self.failUnless(abs(output["allocate"]["mean"] - 9500) < 1)
1258         self.failUnless(abs(output["allocate"]["01_0_percentile"] - 9010) < 1)
1259         self.failUnless(abs(output["allocate"]["10_0_percentile"] - 9100) < 1)
1260         self.failUnless(abs(output["allocate"]["50_0_percentile"] - 9500) < 1)
1261         self.failUnless(abs(output["allocate"]["90_0_percentile"] - 9900) < 1)
1262         self.failUnless(abs(output["allocate"]["95_0_percentile"] - 9950) < 1)
1263         self.failUnless(abs(output["allocate"]["99_0_percentile"] - 9990) < 1)
1264         self.failUnless(abs(output["allocate"]["99_9_percentile"] - 9999) < 1)
1265
1266         self.failUnlessEqual(len(ss.latencies["renew"]), 1000)
1267         self.failUnless(abs(output["renew"]["mean"] - 500) < 1)
1268         self.failUnless(abs(output["renew"]["01_0_percentile"] -  10) < 1)
1269         self.failUnless(abs(output["renew"]["10_0_percentile"] - 100) < 1)
1270         self.failUnless(abs(output["renew"]["50_0_percentile"] - 500) < 1)
1271         self.failUnless(abs(output["renew"]["90_0_percentile"] - 900) < 1)
1272         self.failUnless(abs(output["renew"]["95_0_percentile"] - 950) < 1)
1273         self.failUnless(abs(output["renew"]["99_0_percentile"] - 990) < 1)
1274         self.failUnless(abs(output["renew"]["99_9_percentile"] - 999) < 1)
1275
1276         self.failUnlessEqual(len(ss.latencies["cancel"]), 10)
1277         self.failUnless(abs(output["cancel"]["mean"] - 9) < 1)
1278         self.failUnless(abs(output["cancel"]["01_0_percentile"] -  0) < 1)
1279         self.failUnless(abs(output["cancel"]["10_0_percentile"] -  2) < 1)
1280         self.failUnless(abs(output["cancel"]["50_0_percentile"] - 10) < 1)
1281         self.failUnless(abs(output["cancel"]["90_0_percentile"] - 18) < 1)
1282         self.failUnless(abs(output["cancel"]["95_0_percentile"] - 18) < 1)
1283         self.failUnless(abs(output["cancel"]["99_0_percentile"] - 18) < 1)
1284         self.failUnless(abs(output["cancel"]["99_9_percentile"] - 18) < 1)
1285
1286         self.failUnlessEqual(len(ss.latencies["get"]), 1)
1287         self.failUnless(abs(output["get"]["mean"] - 5) < 1)
1288         self.failUnless(abs(output["get"]["01_0_percentile"] - 5) < 1)
1289         self.failUnless(abs(output["get"]["10_0_percentile"] - 5) < 1)
1290         self.failUnless(abs(output["get"]["50_0_percentile"] - 5) < 1)
1291         self.failUnless(abs(output["get"]["90_0_percentile"] - 5) < 1)
1292         self.failUnless(abs(output["get"]["95_0_percentile"] - 5) < 1)
1293         self.failUnless(abs(output["get"]["99_0_percentile"] - 5) < 1)
1294         self.failUnless(abs(output["get"]["99_9_percentile"] - 5) < 1)
1295
1296 def remove_tags(s):
1297     s = re.sub(r'<[^>]*>', ' ', s)
1298     s = re.sub(r'\s+', ' ', s)
1299     return s
1300
1301 class BucketCounter(unittest.TestCase, pollmixin.PollMixin):
1302
1303     def setUp(self):
1304         self.s = service.MultiService()
1305         self.s.startService()
1306     def tearDown(self):
1307         return self.s.stopService()
1308
1309     def test_bucket_counter(self):
1310         basedir = "storage/BucketCounter/bucket_counter"
1311         fileutil.make_dirs(basedir)
1312         ss = StorageServer(basedir, "\x00" * 20)
1313         # to make sure we capture the bucket-counting-crawler in the middle
1314         # of a cycle, we reach in and reduce its maximum slice time to 0.
1315         orig_cpu_slice = ss.bucket_counter.cpu_slice
1316         ss.bucket_counter.cpu_slice = 0
1317         ss.setServiceParent(self.s)
1318
1319         w = StorageStatus(ss)
1320
1321         # this sample is before the crawler has started doing anything
1322         html = w.renderSynchronously()
1323         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1324         s = remove_tags(html)
1325         self.failUnless("Accepting new shares: Yes" in s, s)
1326         self.failUnless("Reserved space: - 0 B (0)" in s, s)
1327         self.failUnless("Total buckets: Not computed yet" in s, s)
1328         self.failUnless("Next crawl in" in s, s)
1329
1330         # give the bucket-counting-crawler one tick to get started. The
1331         # cpu_slice=0 will force it to yield right after it processes the
1332         # first prefix
1333
1334         d = eventual.fireEventually()
1335         def _check(ignored):
1336             # are we really right after the first prefix?
1337             state = ss.bucket_counter.get_state()
1338             self.failUnlessEqual(state["last-complete-prefix"],
1339                                  ss.bucket_counter.prefixes[0])
1340             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1341             html = w.renderSynchronously()
1342             s = remove_tags(html)
1343             self.failUnless(" Current crawl " in s, s)
1344             self.failUnless(" (next work in " in s, s)
1345         d.addCallback(_check)
1346
1347         # now give it enough time to complete a full cycle
1348         def _watch():
1349             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1350         d.addCallback(lambda ignored: self.poll(_watch))
1351         def _check2(ignored):
1352             ss.bucket_counter.cpu_slice = orig_cpu_slice
1353             html = w.renderSynchronously()
1354             s = remove_tags(html)
1355             self.failUnless("Total buckets: 0 (the number of" in s, s)
1356             self.failUnless("Next crawl in 359" in s, s) # about 3600-1 seconds
1357         d.addCallback(_check2)
1358         return d
1359
1360     def test_bucket_counter_cleanup(self):
1361         basedir = "storage/BucketCounter/bucket_counter_cleanup"
1362         fileutil.make_dirs(basedir)
1363         ss = StorageServer(basedir, "\x00" * 20)
1364         # to make sure we capture the bucket-counting-crawler in the middle
1365         # of a cycle, we reach in and reduce its maximum slice time to 0.
1366         orig_cpu_slice = ss.bucket_counter.cpu_slice
1367         ss.bucket_counter.cpu_slice = 0
1368         ss.setServiceParent(self.s)
1369
1370         d = eventual.fireEventually()
1371
1372         def _after_first_prefix(ignored):
1373             ss.bucket_counter.cpu_slice = 100.0 # finish as fast as possible
1374             # now sneak in and mess with its state, to make sure it cleans up
1375             # properly at the end of the cycle
1376             state = ss.bucket_counter.state
1377             self.failUnlessEqual(state["last-complete-prefix"],
1378                                  ss.bucket_counter.prefixes[0])
1379             state["share-counts"][-12] = {}
1380             state["storage-index-samples"]["bogusprefix!"] = (-12, [])
1381             ss.bucket_counter.save_state()
1382         d.addCallback(_after_first_prefix)
1383
1384         # now give it enough time to complete a cycle
1385         def _watch():
1386             return not ss.bucket_counter.get_progress()["cycle-in-progress"]
1387         d.addCallback(lambda ignored: self.poll(_watch))
1388         def _check2(ignored):
1389             ss.bucket_counter.cpu_slice = orig_cpu_slice
1390             s = ss.bucket_counter.get_state()
1391             self.failIf(-12 in s["share-counts"], s["share-counts"].keys())
1392             self.failIf("bogusprefix!" in s["storage-index-samples"],
1393                         s["storage-index-samples"].keys())
1394         d.addCallback(_check2)
1395         return d
1396
1397 class NoStatvfsServer(StorageServer):
1398     def do_statvfs(self):
1399         raise AttributeError
1400
1401 class WebStatus(unittest.TestCase, pollmixin.PollMixin):
1402
1403     def setUp(self):
1404         self.s = service.MultiService()
1405         self.s.startService()
1406     def tearDown(self):
1407         return self.s.stopService()
1408
1409     def test_no_server(self):
1410         w = StorageStatus(None)
1411         html = w.renderSynchronously()
1412         self.failUnless("<h1>No Storage Server Running</h1>" in html, html)
1413
1414     def test_status(self):
1415         basedir = "storage/WebStatus/status"
1416         fileutil.make_dirs(basedir)
1417         ss = StorageServer(basedir, "\x00" * 20)
1418         ss.setServiceParent(self.s)
1419         w = StorageStatus(ss)
1420         html = w.renderSynchronously()
1421         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1422         s = remove_tags(html)
1423         self.failUnless("Accepting new shares: Yes" in s, s)
1424         self.failUnless("Reserved space: - 0 B (0)" in s, s)
1425
1426     def test_status_no_statvfs(self):
1427         # windows has no os.statvfs . Make sure the code handles that even on
1428         # unix.
1429         basedir = "storage/WebStatus/status_no_statvfs"
1430         fileutil.make_dirs(basedir)
1431         ss = NoStatvfsServer(basedir, "\x00" * 20)
1432         ss.setServiceParent(self.s)
1433         w = StorageStatus(ss)
1434         html = w.renderSynchronously()
1435         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1436         s = remove_tags(html)
1437         self.failUnless("Accepting new shares: Yes" in s, s)
1438         self.failUnless("Total disk space: ?" in s, s)
1439
1440     def test_readonly(self):
1441         basedir = "storage/WebStatus/readonly"
1442         fileutil.make_dirs(basedir)
1443         ss = StorageServer(basedir, "\x00" * 20, readonly_storage=True)
1444         ss.setServiceParent(self.s)
1445         w = StorageStatus(ss)
1446         html = w.renderSynchronously()
1447         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1448         s = remove_tags(html)
1449         self.failUnless("Accepting new shares: No" in s, s)
1450
1451     def test_reserved(self):
1452         basedir = "storage/WebStatus/reserved"
1453         fileutil.make_dirs(basedir)
1454         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
1455         ss.setServiceParent(self.s)
1456         w = StorageStatus(ss)
1457         html = w.renderSynchronously()
1458         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1459         s = remove_tags(html)
1460         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
1461
1462     def test_huge_reserved(self):
1463         basedir = "storage/WebStatus/reserved"
1464         fileutil.make_dirs(basedir)
1465         ss = StorageServer(basedir, "\x00" * 20, reserved_space=10e6)
1466         ss.setServiceParent(self.s)
1467         w = StorageStatus(ss)
1468         html = w.renderSynchronously()
1469         self.failUnless("<h1>Storage Server Status</h1>" in html, html)
1470         s = remove_tags(html)
1471         self.failUnless("Reserved space: - 10.00 MB (10000000)" in s, s)
1472
1473     def test_util(self):
1474         w = StorageStatus(None)
1475         self.failUnlessEqual(w.render_space(None, None), "?")
1476         self.failUnlessEqual(w.render_space(None, 10e6), "10.00 MB (10000000)")
1477         self.failUnlessEqual(remove_prefix("foo.bar", "foo."), "bar")
1478         self.failUnlessEqual(remove_prefix("foo.bar", "baz."), None)
1479