2 from twisted.trial import unittest
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 import time, os.path, stat
9 from allmydata import interfaces
10 from allmydata.util import fileutil, hashutil
11 from allmydata.storage import BucketWriter, BucketReader, \
12 WriteBucketProxy, ReadBucketProxy, StorageServer
14 class Bucket(unittest.TestCase):
15 def make_workdir(self, name):
16 basedir = os.path.join("storage", "Bucket", name)
17 incoming = os.path.join(basedir, "tmp", "bucket")
18 final = os.path.join(basedir, "bucket")
19 fileutil.make_dirs(basedir)
20 fileutil.make_dirs(os.path.join(basedir, "tmp"))
21 return incoming, final
23 def bucket_writer_closed(self, bw, consumed):
28 renew_secret = os.urandom(32)
29 cancel_secret = os.urandom(32)
30 expiration_time = time.time() + 5000
31 return (owner_num, renew_secret, cancel_secret, expiration_time)
33 def test_create(self):
34 incoming, final = self.make_workdir("test_create")
35 bw = BucketWriter(self, incoming, final, 200, self.make_lease())
36 bw.remote_write(0, "a"*25)
37 bw.remote_write(25, "b"*25)
38 bw.remote_write(50, "c"*25)
39 bw.remote_write(75, "d"*7)
42 def test_readwrite(self):
43 incoming, final = self.make_workdir("test_readwrite")
44 bw = BucketWriter(self, incoming, final, 200, self.make_lease())
45 bw.remote_write(0, "a"*25)
46 bw.remote_write(25, "b"*25)
47 bw.remote_write(50, "c"*7) # last block may be short
51 br = BucketReader(final)
52 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
53 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
54 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
58 def callRemote(self, methname, *args, **kwargs):
60 meth = getattr(self.target, "remote_" + methname)
61 return meth(*args, **kwargs)
62 return defer.maybeDeferred(_call)
64 class BucketProxy(unittest.TestCase):
65 def make_bucket(self, name, size):
66 basedir = os.path.join("storage", "BucketProxy", name)
67 incoming = os.path.join(basedir, "tmp", "bucket")
68 final = os.path.join(basedir, "bucket")
69 fileutil.make_dirs(basedir)
70 fileutil.make_dirs(os.path.join(basedir, "tmp"))
71 bw = BucketWriter(self, incoming, final, size, self.make_lease())
78 renew_secret = os.urandom(32)
79 cancel_secret = os.urandom(32)
80 expiration_time = time.time() + 5000
81 return (owner_num, renew_secret, cancel_secret, expiration_time)
83 def bucket_writer_closed(self, bw, consumed):
86 def test_create(self):
87 bw, rb, final = self.make_bucket("test_create", 500)
88 bp = WriteBucketProxy(rb,
93 uri_extension_size=500)
94 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
96 def test_readwrite(self):
97 # Let's pretend each share has 100 bytes of data, and that there are
98 # 4 segments (25 bytes each), and 8 shares total. So the three
99 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
100 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
101 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
102 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
103 # long. That should make the whole share:
105 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
107 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
109 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
111 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
113 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
115 uri_extension = "s" + "E"*498 + "e"
117 bw, rb, final = self.make_bucket("test_readwrite", 1414)
118 bp = WriteBucketProxy(rb,
123 uri_extension_size=len(uri_extension))
126 d.addCallback(lambda res: bp.put_block(0, "a"*25))
127 d.addCallback(lambda res: bp.put_block(1, "b"*25))
128 d.addCallback(lambda res: bp.put_block(2, "c"*25))
129 d.addCallback(lambda res: bp.put_block(3, "d"*20))
130 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
131 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
132 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
133 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
134 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
135 d.addCallback(lambda res: bp.close())
137 # now read everything back
138 def _start_reading(res):
139 br = BucketReader(final)
142 rbp = ReadBucketProxy(rb)
143 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
145 d1 = rbp.startIfNecessary()
146 d1.addCallback(lambda res: rbp.get_block(0))
147 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
148 d1.addCallback(lambda res: rbp.get_block(1))
149 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
150 d1.addCallback(lambda res: rbp.get_block(2))
151 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
152 d1.addCallback(lambda res: rbp.get_block(3))
153 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
155 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
156 d1.addCallback(lambda res:
157 self.failUnlessEqual(res, plaintext_hashes))
158 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
159 d1.addCallback(lambda res:
160 self.failUnlessEqual(res, crypttext_hashes))
161 d1.addCallback(lambda res: rbp.get_block_hashes())
162 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
163 d1.addCallback(lambda res: rbp.get_share_hashes())
164 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
165 d1.addCallback(lambda res: rbp.get_uri_extension())
166 d1.addCallback(lambda res:
167 self.failUnlessEqual(res, uri_extension))
171 d.addCallback(_start_reading)
177 class Server(unittest.TestCase):
180 self.sparent = service.MultiService()
181 self._secret = itertools.count()
183 return self.sparent.stopService()
185 def workdir(self, name):
186 basedir = os.path.join("storage", "Server", name)
189 def create(self, name, sizelimit=None):
190 workdir = self.workdir(name)
191 ss = StorageServer(workdir, sizelimit)
192 ss.setServiceParent(self.sparent)
195 def test_create(self):
196 ss = self.create("test_create")
198 def allocate(self, ss, storage_index, sharenums, size):
199 renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
200 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
201 return ss.remote_allocate_buckets(storage_index,
202 renew_secret, cancel_secret,
203 sharenums, size, Referenceable())
205 def test_remove_incoming(self):
206 ss = self.create("test_remove_incoming")
207 already, writers = self.allocate(ss, "vid", range(3), 10)
208 for i,wb in writers.items():
209 wb.remote_write(0, "%10d" % i)
211 incomingdir = os.path.join(self.workdir("test_remove_incoming"),
212 "shares", "incoming")
213 leftover_dirs = os.listdir(incomingdir)
214 self.failUnlessEqual(leftover_dirs, [])
216 def test_allocate(self):
217 ss = self.create("test_allocate")
219 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
221 canary = Referenceable()
222 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
223 self.failUnlessEqual(already, set())
224 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
226 # while the buckets are open, they should not count as readable
227 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
229 for i,wb in writers.items():
230 wb.remote_write(0, "%25d" % i)
233 # now they should be readable
234 b = ss.remote_get_buckets("vid")
235 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
236 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
238 # now if we about writing again, the server should offer those three
239 # buckets as already present
240 already,writers = self.allocate(ss, "vid", [0,1,2,3,4], 75)
241 self.failUnlessEqual(already, set([0,1,2]))
242 self.failUnlessEqual(set(writers.keys()), set([3,4]))
244 # while those two buckets are open for writing, the server should
245 # tell new uploaders that they already exist (so that we don't try to
246 # upload into them a second time)
248 already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
249 self.failUnlessEqual(already, set([2,3,4]))
250 self.failUnlessEqual(set(writers.keys()), set([5]))
252 def test_sizelimits(self):
253 ss = self.create("test_sizelimits", 5000)
254 canary = Referenceable()
255 # a newly created and filled share incurs this much overhead, beyond
256 # the size we request.
258 LEASE_SIZE = 4+32+32+4
260 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
261 self.failUnlessEqual(len(writers), 3)
262 # now the StorageServer should have 3000 bytes provisionally
263 # allocated, allowing only 2000 more to be claimed
264 self.failUnlessEqual(len(ss._active_writers), 3)
266 # allocating 1001-byte shares only leaves room for one
267 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
268 self.failUnlessEqual(len(writers2), 1)
269 self.failUnlessEqual(len(ss._active_writers), 4)
271 # we abandon the first set, so their provisional allocation should be
275 self.failUnlessEqual(len(ss._active_writers), 1)
276 # now we have a provisional allocation of 1001 bytes
278 # and we close the second set, so their provisional allocation should
279 # become real, long-term allocation, and grows to include the
281 for bw in writers2.values():
282 bw.remote_write(0, "a"*25)
287 self.failUnlessEqual(len(ss._active_writers), 0)
289 allocated = 1001 + OVERHEAD + LEASE_SIZE
290 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
291 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
292 already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
293 self.failUnlessEqual(len(writers3), 39)
294 self.failUnlessEqual(len(ss._active_writers), 39)
298 self.failUnlessEqual(len(ss._active_writers), 0)
299 ss.disownServiceParent()
302 # creating a new StorageServer in the same directory should see the
305 # metadata that goes into the share file is counted upon share close,
306 # as well as at startup. metadata that goes into other files will not
307 # be counted until the next startup, so if we were creating any
308 # extra-file metadata, the allocation would be more than 'allocated'
309 # and this test would need to be changed.
310 ss = self.create("test_sizelimits", 5000)
311 already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
312 self.failUnlessEqual(len(writers4), 39)
313 self.failUnlessEqual(len(ss._active_writers), 39)
316 basedir = self.workdir("test_seek_behavior")
317 fileutil.make_dirs(basedir)
318 filename = os.path.join(basedir, "testfile")
319 f = open(filename, "wb")
322 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
323 # files. mode="a" preserves previous contents but does not allow
324 # seeking-to-create-holes. mode="r+" allows both.
325 f = open(filename, "rb+")
329 filelen = os.stat(filename)[stat.ST_SIZE]
330 self.failUnlessEqual(filelen, 100+3)
331 f2 = open(filename, "rb")
332 self.failUnlessEqual(f2.read(5), "start")
335 def test_leases(self):
336 ss = self.create("test_leases")
337 canary = Referenceable()
341 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
342 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
343 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
344 sharenums, size, canary)
345 self.failUnlessEqual(len(already), 0)
346 self.failUnlessEqual(len(writers), 5)
347 for wb in writers.values():
350 leases = list(ss.get_leases("si0"))
351 self.failUnlessEqual(len(leases), 1)
352 self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
354 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
355 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
356 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
357 sharenums, size, canary)
358 for wb in writers.values():
361 # take out a second lease on si1
362 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
363 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
364 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
365 sharenums, size, canary)
366 self.failUnlessEqual(len(already), 5)
367 self.failUnlessEqual(len(writers), 0)
369 leases = list(ss.get_leases("si1"))
370 self.failUnlessEqual(len(leases), 2)
371 self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
373 # check that si0 is readable
374 readers = ss.remote_get_buckets("si0")
375 self.failUnlessEqual(len(readers), 5)
377 # renew the first lease. Only the proper renew_secret should work
378 ss.remote_renew_lease("si0", rs0)
379 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
380 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
382 # check that si0 is still readable
383 readers = ss.remote_get_buckets("si0")
384 self.failUnlessEqual(len(readers), 5)
387 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
388 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
389 ss.remote_cancel_lease("si0", cs0)
391 # si0 should now be gone
392 readers = ss.remote_get_buckets("si0")
393 self.failUnlessEqual(len(readers), 0)
394 # and the renew should no longer work
395 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
398 # cancel the first lease on si1, leaving the second in place
399 ss.remote_cancel_lease("si1", cs1)
400 readers = ss.remote_get_buckets("si1")
401 self.failUnlessEqual(len(readers), 5)
402 # the corresponding renew should no longer work
403 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
405 leases = list(ss.get_leases("si1"))
406 self.failUnlessEqual(len(leases), 1)
407 self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
409 ss.remote_renew_lease("si1", rs2)
410 # cancelling the second should make it go away
411 ss.remote_cancel_lease("si1", cs2)
412 readers = ss.remote_get_buckets("si1")
413 self.failUnlessEqual(len(readers), 0)
414 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
415 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
417 leases = list(ss.get_leases("si1"))
418 self.failUnlessEqual(len(leases), 0)
421 # test overlapping uploads
422 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
423 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
424 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
425 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
426 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
427 sharenums, size, canary)
428 self.failUnlessEqual(len(already), 0)
429 self.failUnlessEqual(len(writers), 5)
430 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
431 sharenums, size, canary)
432 self.failUnlessEqual(len(already2), 5)
433 self.failUnlessEqual(len(writers2), 0)
434 for wb in writers.values():
437 leases = list(ss.get_leases("si3"))
438 self.failUnlessEqual(len(leases), 2)