2 from twisted.trial import unittest
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
9 from allmydata import interfaces
10 from allmydata.util import fileutil, hashutil
11 from allmydata.storage import BucketWriter, BucketReader, \
12 WriteBucketProxy, ReadBucketProxy, StorageServer
14 class Bucket(unittest.TestCase):
15 def make_workdir(self, name):
16 basedir = os.path.join("storage", "Bucket", name)
17 incoming = os.path.join(basedir, "tmp", "bucket")
18 final = os.path.join(basedir, "bucket")
19 fileutil.make_dirs(basedir)
20 fileutil.make_dirs(os.path.join(basedir, "tmp"))
21 return incoming, final
23 def bucket_writer_closed(self, bw, consumed):
26 def test_create(self):
27 incoming, final = self.make_workdir("test_create")
28 bw = BucketWriter(self, incoming, final, 200)
29 bw.remote_write(0, "a"*25)
30 bw.remote_write(25, "b"*25)
31 bw.remote_write(50, "c"*25)
32 bw.remote_write(75, "d"*7)
35 def test_readwrite(self):
36 incoming, final = self.make_workdir("test_readwrite")
37 bw = BucketWriter(self, incoming, final, 200)
38 bw.remote_write(0, "a"*25)
39 bw.remote_write(25, "b"*25)
40 bw.remote_write(50, "c"*7) # last block may be short
44 br = BucketReader(final)
45 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
46 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
47 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
51 def callRemote(self, methname, *args, **kwargs):
53 meth = getattr(self.target, "remote_" + methname)
54 return meth(*args, **kwargs)
55 return defer.maybeDeferred(_call)
57 class BucketProxy(unittest.TestCase):
58 def make_bucket(self, name, size):
59 basedir = os.path.join("storage", "BucketProxy", name)
60 incoming = os.path.join(basedir, "tmp", "bucket")
61 final = os.path.join(basedir, "bucket")
62 fileutil.make_dirs(basedir)
63 fileutil.make_dirs(os.path.join(basedir, "tmp"))
64 bw = BucketWriter(self, incoming, final, size)
69 def bucket_writer_closed(self, bw, consumed):
72 def test_create(self):
73 bw, rb, final = self.make_bucket("test_create", 500)
74 bp = WriteBucketProxy(rb,
79 uri_extension_size=500)
80 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
82 def test_readwrite(self):
83 # Let's pretend each share has 100 bytes of data, and that there are
84 # 4 segments (25 bytes each), and 8 shares total. So the three
85 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
86 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
87 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
88 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
89 # long. That should make the whole share:
91 # 0x1c + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1406 bytes long
93 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
95 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
97 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
99 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
101 uri_extension = "s" + "E"*498 + "e"
103 bw, rb, final = self.make_bucket("test_readwrite", 1406)
104 bp = WriteBucketProxy(rb,
109 uri_extension_size=len(uri_extension))
112 d.addCallback(lambda res: bp.put_block(0, "a"*25))
113 d.addCallback(lambda res: bp.put_block(1, "b"*25))
114 d.addCallback(lambda res: bp.put_block(2, "c"*25))
115 d.addCallback(lambda res: bp.put_block(3, "d"*20))
116 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
117 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
118 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
119 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
120 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
121 d.addCallback(lambda res: bp.close())
123 # now read everything back
124 def _start_reading(res):
125 br = BucketReader(final)
128 rbp = ReadBucketProxy(rb)
129 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
131 d1 = rbp.startIfNecessary()
132 d1.addCallback(lambda res: rbp.get_block(0))
133 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
134 d1.addCallback(lambda res: rbp.get_block(1))
135 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
136 d1.addCallback(lambda res: rbp.get_block(2))
137 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
138 d1.addCallback(lambda res: rbp.get_block(3))
139 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
141 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
142 d1.addCallback(lambda res:
143 self.failUnlessEqual(res, plaintext_hashes))
144 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
145 d1.addCallback(lambda res:
146 self.failUnlessEqual(res, crypttext_hashes))
147 d1.addCallback(lambda res: rbp.get_block_hashes())
148 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
149 d1.addCallback(lambda res: rbp.get_share_hashes())
150 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
151 d1.addCallback(lambda res: rbp.get_uri_extension())
152 d1.addCallback(lambda res:
153 self.failUnlessEqual(res, uri_extension))
157 d.addCallback(_start_reading)
163 class Server(unittest.TestCase):
166 self.sparent = service.MultiService()
167 self._secret = itertools.count()
169 return self.sparent.stopService()
171 def workdir(self, name):
172 basedir = os.path.join("storage", "Server", name)
175 def create(self, name, sizelimit=None):
176 workdir = self.workdir(name)
177 ss = StorageServer(workdir, sizelimit)
178 ss.setServiceParent(self.sparent)
181 def test_create(self):
182 ss = self.create("test_create")
184 def allocate(self, ss, storage_index, sharenums, size):
185 renew_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
186 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._secret.next())
187 return ss.remote_allocate_buckets(storage_index,
188 renew_secret, cancel_secret,
189 sharenums, size, Referenceable())
191 def test_allocate(self):
192 ss = self.create("test_allocate")
194 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
196 canary = Referenceable()
197 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
198 self.failUnlessEqual(already, set())
199 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
201 # while the buckets are open, they should not count as readable
202 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
204 for i,wb in writers.items():
205 wb.remote_write(0, "%25d" % i)
208 # now they should be readable
209 b = ss.remote_get_buckets("vid")
210 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
211 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
213 # now if we about writing again, the server should offer those three
214 # buckets as already present
215 already,writers = self.allocate(ss, "vid", [0,1,2,3,4], 75)
216 self.failUnlessEqual(already, set([0,1,2]))
217 self.failUnlessEqual(set(writers.keys()), set([3,4]))
219 # while those two buckets are open for writing, the server should
220 # tell new uploaders that they already exist (so that we don't try to
221 # upload into them a second time)
223 already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
224 self.failUnlessEqual(already, set([2,3,4]))
225 self.failUnlessEqual(set(writers.keys()), set([5]))
227 def test_sizelimits(self):
228 ss = self.create("test_sizelimits", 100)
229 canary = Referenceable()
231 already,writers = self.allocate(ss, "vid1", [0,1,2], 25)
232 self.failUnlessEqual(len(writers), 3)
233 # now the StorageServer should have 75 bytes provisionally allocated,
234 # allowing only 25 more to be claimed
235 self.failUnlessEqual(len(ss._active_writers), 3)
237 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 25)
238 self.failUnlessEqual(len(writers2), 1)
239 self.failUnlessEqual(len(ss._active_writers), 4)
241 # we abandon the first set, so their provisional allocation should be
245 self.failUnlessEqual(len(ss._active_writers), 1)
247 # and we close the second set, so their provisional allocation should
248 # become real, long-term allocation
249 for bw in writers2.values():
250 bw.remote_write(0, "a"*25)
255 self.failUnlessEqual(len(ss._active_writers), 0)
257 # now there should be 25 bytes allocated, and 75 free
258 already3,writers3 = self.allocate(ss,"vid3", [0,1,2,3], 25)
259 self.failUnlessEqual(len(writers3), 3)
260 self.failUnlessEqual(len(ss._active_writers), 3)
264 self.failUnlessEqual(len(ss._active_writers), 0)
265 ss.disownServiceParent()
268 # creating a new StorageServer in the same directory should see the
269 # same usage. note that metadata will be counted at startup but not
270 # during runtime, so if we were creating any metadata, the allocation
271 # would be more than 25 bytes and this test would need to be changed.
272 ss = self.create("test_sizelimits", 100)
273 already4,writers4 = self.allocate(ss, "vid4", [0,1,2,3], 25)
274 self.failUnlessEqual(len(writers4), 3)
275 self.failUnlessEqual(len(ss._active_writers), 3)
277 def test_leases(self):
278 ss = self.create("test_leases")
279 canary = Referenceable()
283 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
284 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
285 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
286 sharenums, size, canary)
287 self.failUnlessEqual(len(already), 0)
288 self.failUnlessEqual(len(writers), 5)
289 for wb in writers.values():
292 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
293 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
294 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
295 sharenums, size, canary)
296 for wb in writers.values():
299 # take out a second lease on si1
300 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._secret.next()),
301 hashutil.tagged_hash("blah", "%d" % self._secret.next()))
302 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
303 sharenums, size, canary)
304 self.failUnlessEqual(len(already), 5)
305 self.failUnlessEqual(len(writers), 0)
307 # check that si0 is readable
308 readers = ss.remote_get_buckets("si0")
309 self.failUnlessEqual(len(readers), 5)
311 # renew the first lease. Only the proper renew_secret should work
312 ss.remote_renew_lease("si0", rs0)
313 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
314 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
316 # check that si0 is still readable
317 readers = ss.remote_get_buckets("si0")
318 self.failUnlessEqual(len(readers), 5)
321 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
322 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
323 ss.remote_cancel_lease("si0", cs0)
325 # si0 should now be gone
326 readers = ss.remote_get_buckets("si0")
327 self.failUnlessEqual(len(readers), 0)
328 # and the renew should no longer work
329 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
332 # cancel the first lease on si1, leaving the second in place
333 ss.remote_cancel_lease("si1", cs1)
334 readers = ss.remote_get_buckets("si1")
335 self.failUnlessEqual(len(readers), 5)
336 # the corresponding renew should no longer work
337 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
339 ss.remote_renew_lease("si1", rs2)
340 # cancelling the second should make it go away
341 ss.remote_cancel_lease("si1", cs2)
342 readers = ss.remote_get_buckets("si1")
343 self.failUnlessEqual(len(readers), 0)
344 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
345 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)