2 from twisted.trial import unittest
4 from twisted.application import service
5 from twisted.internet import defer
6 from foolscap import Referenceable
7 import time, os.path, stat
9 from allmydata import interfaces
10 from allmydata.util import fileutil, hashutil, idlib
11 from allmydata.storage import BucketWriter, BucketReader, \
12 WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile
13 from allmydata.interfaces import BadWriteEnablerError
15 class Bucket(unittest.TestCase):
16 def make_workdir(self, name):
17 basedir = os.path.join("storage", "Bucket", name)
18 incoming = os.path.join(basedir, "tmp", "bucket")
19 final = os.path.join(basedir, "bucket")
20 fileutil.make_dirs(basedir)
21 fileutil.make_dirs(os.path.join(basedir, "tmp"))
22 return incoming, final
24 def bucket_writer_closed(self, bw, consumed):
29 renew_secret = os.urandom(32)
30 cancel_secret = os.urandom(32)
31 expiration_time = time.time() + 5000
32 return (owner_num, renew_secret, cancel_secret, expiration_time)
34 def test_create(self):
35 incoming, final = self.make_workdir("test_create")
36 bw = BucketWriter(self, incoming, final, 200, self.make_lease())
37 bw.remote_write(0, "a"*25)
38 bw.remote_write(25, "b"*25)
39 bw.remote_write(50, "c"*25)
40 bw.remote_write(75, "d"*7)
43 def test_readwrite(self):
44 incoming, final = self.make_workdir("test_readwrite")
45 bw = BucketWriter(self, incoming, final, 200, self.make_lease())
46 bw.remote_write(0, "a"*25)
47 bw.remote_write(25, "b"*25)
48 bw.remote_write(50, "c"*7) # last block may be short
52 br = BucketReader(final)
53 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
54 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
55 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
59 def callRemote(self, methname, *args, **kwargs):
61 meth = getattr(self.target, "remote_" + methname)
62 return meth(*args, **kwargs)
63 return defer.maybeDeferred(_call)
65 class BucketProxy(unittest.TestCase):
66 def make_bucket(self, name, size):
67 basedir = os.path.join("storage", "BucketProxy", name)
68 incoming = os.path.join(basedir, "tmp", "bucket")
69 final = os.path.join(basedir, "bucket")
70 fileutil.make_dirs(basedir)
71 fileutil.make_dirs(os.path.join(basedir, "tmp"))
72 bw = BucketWriter(self, incoming, final, size, self.make_lease())
79 renew_secret = os.urandom(32)
80 cancel_secret = os.urandom(32)
81 expiration_time = time.time() + 5000
82 return (owner_num, renew_secret, cancel_secret, expiration_time)
84 def bucket_writer_closed(self, bw, consumed):
87 def test_create(self):
88 bw, rb, final = self.make_bucket("test_create", 500)
89 bp = WriteBucketProxy(rb,
94 uri_extension_size=500)
95 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
97 def test_readwrite(self):
98 # Let's pretend each share has 100 bytes of data, and that there are
99 # 4 segments (25 bytes each), and 8 shares total. So the three
100 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
101 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
102 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
103 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
104 # long. That should make the whole share:
106 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
108 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
110 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
112 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
114 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
116 uri_extension = "s" + "E"*498 + "e"
118 bw, rb, final = self.make_bucket("test_readwrite", 1414)
119 bp = WriteBucketProxy(rb,
124 uri_extension_size=len(uri_extension))
127 d.addCallback(lambda res: bp.put_block(0, "a"*25))
128 d.addCallback(lambda res: bp.put_block(1, "b"*25))
129 d.addCallback(lambda res: bp.put_block(2, "c"*25))
130 d.addCallback(lambda res: bp.put_block(3, "d"*20))
131 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
132 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
133 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
134 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
135 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
136 d.addCallback(lambda res: bp.close())
138 # now read everything back
139 def _start_reading(res):
140 br = BucketReader(final)
143 rbp = ReadBucketProxy(rb)
144 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
146 d1 = rbp.startIfNecessary()
147 d1.addCallback(lambda res: rbp.get_block(0))
148 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
149 d1.addCallback(lambda res: rbp.get_block(1))
150 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
151 d1.addCallback(lambda res: rbp.get_block(2))
152 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
153 d1.addCallback(lambda res: rbp.get_block(3))
154 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
156 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
157 d1.addCallback(lambda res:
158 self.failUnlessEqual(res, plaintext_hashes))
159 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
160 d1.addCallback(lambda res:
161 self.failUnlessEqual(res, crypttext_hashes))
162 d1.addCallback(lambda res: rbp.get_block_hashes())
163 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
164 d1.addCallback(lambda res: rbp.get_share_hashes())
165 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
166 d1.addCallback(lambda res: rbp.get_uri_extension())
167 d1.addCallback(lambda res:
168 self.failUnlessEqual(res, uri_extension))
172 d.addCallback(_start_reading)
178 class Server(unittest.TestCase):
181 self.sparent = service.MultiService()
182 self._lease_secret = itertools.count()
184 return self.sparent.stopService()
186 def workdir(self, name):
187 basedir = os.path.join("storage", "Server", name)
190 def create(self, name, sizelimit=None):
191 workdir = self.workdir(name)
192 ss = StorageServer(workdir, sizelimit)
193 ss.setServiceParent(self.sparent)
196 def test_create(self):
197 ss = self.create("test_create")
199 def allocate(self, ss, storage_index, sharenums, size):
200 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
201 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
202 return ss.remote_allocate_buckets(storage_index,
203 renew_secret, cancel_secret,
204 sharenums, size, Referenceable())
206 def test_remove_incoming(self):
207 ss = self.create("test_remove_incoming")
208 already, writers = self.allocate(ss, "vid", range(3), 10)
209 for i,wb in writers.items():
210 wb.remote_write(0, "%10d" % i)
212 incomingdir = os.path.join(self.workdir("test_remove_incoming"),
213 "shares", "incoming")
214 leftover_dirs = os.listdir(incomingdir)
215 self.failUnlessEqual(leftover_dirs, [])
217 def test_allocate(self):
218 ss = self.create("test_allocate")
220 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
222 canary = Referenceable()
223 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
224 self.failUnlessEqual(already, set())
225 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
227 # while the buckets are open, they should not count as readable
228 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
230 for i,wb in writers.items():
231 wb.remote_write(0, "%25d" % i)
234 # now they should be readable
235 b = ss.remote_get_buckets("vid")
236 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
237 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
239 # now if we about writing again, the server should offer those three
240 # buckets as already present. It should offer them even if we don't
241 # ask about those specific ones.
242 already,writers = self.allocate(ss, "vid", [2,3,4], 75)
243 self.failUnlessEqual(already, set([0,1,2]))
244 self.failUnlessEqual(set(writers.keys()), set([3,4]))
246 # while those two buckets are open for writing, the server should
247 # tell new uploaders that they already exist (so that we don't try to
248 # upload into them a second time)
250 already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
251 self.failUnlessEqual(already, set([0,1,2,3,4]))
252 self.failUnlessEqual(set(writers.keys()), set([5]))
254 def test_sizelimits(self):
255 ss = self.create("test_sizelimits", 5000)
256 canary = Referenceable()
257 # a newly created and filled share incurs this much overhead, beyond
258 # the size we request.
260 LEASE_SIZE = 4+32+32+4
262 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
263 self.failUnlessEqual(len(writers), 3)
264 # now the StorageServer should have 3000 bytes provisionally
265 # allocated, allowing only 2000 more to be claimed
266 self.failUnlessEqual(len(ss._active_writers), 3)
268 # allocating 1001-byte shares only leaves room for one
269 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
270 self.failUnlessEqual(len(writers2), 1)
271 self.failUnlessEqual(len(ss._active_writers), 4)
273 # we abandon the first set, so their provisional allocation should be
277 self.failUnlessEqual(len(ss._active_writers), 1)
278 # now we have a provisional allocation of 1001 bytes
280 # and we close the second set, so their provisional allocation should
281 # become real, long-term allocation, and grows to include the
283 for bw in writers2.values():
284 bw.remote_write(0, "a"*25)
289 self.failUnlessEqual(len(ss._active_writers), 0)
291 allocated = 1001 + OVERHEAD + LEASE_SIZE
292 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
293 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
294 already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
295 self.failUnlessEqual(len(writers3), 39)
296 self.failUnlessEqual(len(ss._active_writers), 39)
300 self.failUnlessEqual(len(ss._active_writers), 0)
301 ss.disownServiceParent()
304 # creating a new StorageServer in the same directory should see the
307 # metadata that goes into the share file is counted upon share close,
308 # as well as at startup. metadata that goes into other files will not
309 # be counted until the next startup, so if we were creating any
310 # extra-file metadata, the allocation would be more than 'allocated'
311 # and this test would need to be changed.
312 ss = self.create("test_sizelimits", 5000)
313 already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
314 self.failUnlessEqual(len(writers4), 39)
315 self.failUnlessEqual(len(ss._active_writers), 39)
318 basedir = self.workdir("test_seek_behavior")
319 fileutil.make_dirs(basedir)
320 filename = os.path.join(basedir, "testfile")
321 f = open(filename, "wb")
324 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
325 # files. mode="a" preserves previous contents but does not allow
326 # seeking-to-create-holes. mode="r+" allows both.
327 f = open(filename, "rb+")
331 filelen = os.stat(filename)[stat.ST_SIZE]
332 self.failUnlessEqual(filelen, 100+3)
333 f2 = open(filename, "rb")
334 self.failUnlessEqual(f2.read(5), "start")
337 def test_leases(self):
338 ss = self.create("test_leases")
339 canary = Referenceable()
343 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
344 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
345 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
346 sharenums, size, canary)
347 self.failUnlessEqual(len(already), 0)
348 self.failUnlessEqual(len(writers), 5)
349 for wb in writers.values():
352 leases = list(ss.get_leases("si0"))
353 self.failUnlessEqual(len(leases), 1)
354 self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
356 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
357 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
358 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
359 sharenums, size, canary)
360 for wb in writers.values():
363 # take out a second lease on si1
364 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
365 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
366 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
367 sharenums, size, canary)
368 self.failUnlessEqual(len(already), 5)
369 self.failUnlessEqual(len(writers), 0)
371 leases = list(ss.get_leases("si1"))
372 self.failUnlessEqual(len(leases), 2)
373 self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
375 # check that si0 is readable
376 readers = ss.remote_get_buckets("si0")
377 self.failUnlessEqual(len(readers), 5)
379 # renew the first lease. Only the proper renew_secret should work
380 ss.remote_renew_lease("si0", rs0)
381 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
382 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
384 # check that si0 is still readable
385 readers = ss.remote_get_buckets("si0")
386 self.failUnlessEqual(len(readers), 5)
389 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
390 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
391 ss.remote_cancel_lease("si0", cs0)
393 # si0 should now be gone
394 readers = ss.remote_get_buckets("si0")
395 self.failUnlessEqual(len(readers), 0)
396 # and the renew should no longer work
397 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
400 # cancel the first lease on si1, leaving the second in place
401 ss.remote_cancel_lease("si1", cs1)
402 readers = ss.remote_get_buckets("si1")
403 self.failUnlessEqual(len(readers), 5)
404 # the corresponding renew should no longer work
405 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
407 leases = list(ss.get_leases("si1"))
408 self.failUnlessEqual(len(leases), 1)
409 self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
411 ss.remote_renew_lease("si1", rs2)
412 # cancelling the second should make it go away
413 ss.remote_cancel_lease("si1", cs2)
414 readers = ss.remote_get_buckets("si1")
415 self.failUnlessEqual(len(readers), 0)
416 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
417 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
419 leases = list(ss.get_leases("si1"))
420 self.failUnlessEqual(len(leases), 0)
423 # test overlapping uploads
424 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
425 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
426 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
427 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
428 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
429 sharenums, size, canary)
430 self.failUnlessEqual(len(already), 0)
431 self.failUnlessEqual(len(writers), 5)
432 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
433 sharenums, size, canary)
434 self.failUnlessEqual(len(already2), 5)
435 self.failUnlessEqual(len(writers2), 0)
436 for wb in writers.values():
439 leases = list(ss.get_leases("si3"))
440 self.failUnlessEqual(len(leases), 2)
444 class MutableServer(unittest.TestCase):
447 self.sparent = service.MultiService()
448 self._lease_secret = itertools.count()
450 return self.sparent.stopService()
452 def workdir(self, name):
453 basedir = os.path.join("storage", "MutableServer", name)
456 def create(self, name, sizelimit=None):
457 workdir = self.workdir(name)
458 ss = StorageServer(workdir, sizelimit)
459 ss.setServiceParent(self.sparent)
460 ss.setNodeID("\x00" * 32)
463 def test_create(self):
464 ss = self.create("test_create")
466 def write_enabler(self, we_tag):
467 return hashutil.tagged_hash("we_blah", we_tag)
469 def renew_secret(self, tag):
470 return hashutil.tagged_hash("renew_blah", str(tag))
472 def cancel_secret(self, tag):
473 return hashutil.tagged_hash("cancel_blah", str(tag))
475 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
476 write_enabler = self.write_enabler(we_tag)
477 renew_secret = self.renew_secret(lease_tag)
478 cancel_secret = self.cancel_secret(lease_tag)
479 rstaraw = ss.remote_slot_testv_and_readv_and_writev
480 testandwritev = dict( [ (shnum, ([], [], None) )
481 for shnum in sharenums ] )
483 rc = rstaraw(storage_index,
484 (write_enabler, renew_secret, cancel_secret),
487 (did_write, readv_data) = rc
488 self.failUnless(did_write)
489 self.failUnless(isinstance(readv_data, dict))
490 self.failUnlessEqual(len(readv_data), 0)
492 def test_allocate(self):
493 ss = self.create("test_allocate")
494 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
497 read = ss.remote_slot_readv
498 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
500 self.failUnlessEqual(read("si1", [], [(0, 10)]),
501 {0: [""], 1: [""], 2: [""]})
502 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
506 secrets = ( self.write_enabler("we1"),
507 self.renew_secret("we1"),
508 self.cancel_secret("we1") )
509 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
510 write = ss.remote_slot_testv_and_readv_and_writev
511 answer = write("si1", secrets,
512 {0: ([], [(0,data)], None)},
514 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
516 self.failUnlessEqual(read("si1", [0], [(0,20)]),
517 {0: ["00000000001111111111"]})
518 self.failUnlessEqual(read("si1", [0], [(95,10)]),
520 #self.failUnlessEqual(s0.remote_get_length(), 100)
522 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
523 self.failUnlessRaises(BadWriteEnablerError,
524 write, "si1", bad_secrets,
527 # this testv should fail
528 answer = write("si1", secrets,
529 {0: ([(0, 12, "eq", "444444444444"),
530 (20, 5, "eq", "22222"),
537 self.failUnlessEqual(answer, (False,
538 {0: ["000000000011", "22222"],
542 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
545 answer = write("si1", secrets,
546 {0: ([(10, 5, "lt", "11111"),
553 self.failUnlessEqual(answer, (False,
558 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
561 def test_operators(self):
562 # test operators, the data we're comparing is '11111' in all cases.
563 # test both fail+pass, reset data after each one.
564 ss = self.create("test_operators")
566 secrets = ( self.write_enabler("we1"),
567 self.renew_secret("we1"),
568 self.cancel_secret("we1") )
569 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
570 write = ss.remote_slot_testv_and_readv_and_writev
571 read = ss.remote_slot_readv
574 write("si1", secrets,
575 {0: ([], [(0,data)], None)},
581 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
586 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
587 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
588 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
591 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
596 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
597 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
600 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
605 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
606 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
610 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
615 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
616 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
619 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
624 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
625 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
628 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
633 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
634 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
638 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
643 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
644 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
647 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
652 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
653 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
657 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
662 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
663 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
666 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
671 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
672 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
676 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
681 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
682 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
685 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
690 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
691 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
694 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
699 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
700 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
704 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
709 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
710 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
713 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
718 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
719 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
722 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
727 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
728 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
731 def test_readv(self):
732 ss = self.create("test_readv")
733 secrets = ( self.write_enabler("we1"),
734 self.renew_secret("we1"),
735 self.cancel_secret("we1") )
736 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
737 write = ss.remote_slot_testv_and_readv_and_writev
738 read = ss.remote_slot_readv
739 data = [("%d" % i) * 100 for i in range(3)]
740 rc = write("si1", secrets,
741 {0: ([], [(0,data[0])], None),
742 1: ([], [(0,data[1])], None),
743 2: ([], [(0,data[2])], None),
745 self.failUnlessEqual(rc, (True, {}))
747 answer = read("si1", [], [(0, 10)])
748 self.failUnlessEqual(answer, {0: ["0"*10],
752 def compare_leases_without_timestamps(self, a, b):
753 self.failUnlessEqual(len(a), len(b))
754 for i in range(len(a)):
755 (num_a, (ownerid_a, expiration_time_a,
756 renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
757 (num_b, (ownerid_b, expiration_time_b,
758 renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
759 self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
760 cancel_secret_a, nodeid_a),
761 (num_b, ownerid_b, renew_secret_b,
762 cancel_secret_b, nodeid_b) )
764 def test_leases(self):
765 ss = self.create("test_leases")
767 return ( self.write_enabler("we1"),
768 self.renew_secret("we1-%d" % n),
769 self.cancel_secret("we1-%d" % n) )
770 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
771 write = ss.remote_slot_testv_and_readv_and_writev
772 read = ss.remote_slot_readv
773 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
774 self.failUnlessEqual(rc, (True, {}))
776 # create a random non-numeric file in the bucket directory, to
777 # exercise the code that's supposed to ignore those.
778 bucket_dir = os.path.join(self.workdir("test_leases"),
779 "shares", idlib.b2a("si1"))
780 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
781 f.write("you ought to be ignoring me\n")
784 # re-allocate the slots and use the same secrets, that should update
786 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
789 ss.remote_renew_lease("si1", secrets(0)[1])
791 # now allocate them with a bunch of different secrets, to trigger the
792 # extended lease code
793 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
794 write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
795 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
796 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
797 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
800 ss.remote_cancel_lease("si1", secrets(5)[2])
802 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
803 all_leases = s0.debug_get_leases()
804 self.failUnlessEqual(len(all_leases), 5)
806 # and write enough data to expand the container, forcing the server
808 write("si1", secrets(0),
809 {0: ([], [(0,data)], 200), },
812 # read back the leases, make sure they're still intact.
813 self.compare_leases_without_timestamps(all_leases,
814 s0.debug_get_leases())
816 ss.remote_renew_lease("si1", secrets(0)[1])
817 ss.remote_renew_lease("si1", secrets(1)[1])
818 ss.remote_renew_lease("si1", secrets(2)[1])
819 ss.remote_renew_lease("si1", secrets(3)[1])
820 ss.remote_renew_lease("si1", secrets(4)[1])
821 self.compare_leases_without_timestamps(all_leases,
822 s0.debug_get_leases())
823 # get a new copy of the leases, with the current timestamps. Reading
824 # data and failing to renew/cancel leases should leave the timestamps
826 all_leases = s0.debug_get_leases()
827 # renewing with a bogus token should prompt an error message
829 # TODO: examine the exception thus raised, make sure the old nodeid
830 # is present, to provide for share migration
831 self.failUnlessRaises(IndexError,
832 ss.remote_renew_lease, "si1",
834 # same for cancelling
835 self.failUnlessRaises(IndexError,
836 ss.remote_cancel_lease, "si1",
838 self.failUnlessEqual(all_leases, s0.debug_get_leases())
840 # reading shares should not modify the timestamp
841 read("si1", [], [(0,200)])
842 self.failUnlessEqual(all_leases, s0.debug_get_leases())
844 write("si1", secrets(0),
845 {0: ([], [(200, "make me bigger")], None)}, [])
846 self.compare_leases_without_timestamps(all_leases,
847 s0.debug_get_leases())
849 write("si1", secrets(0),
850 {0: ([], [(500, "make me really bigger")], None)}, [])
851 self.compare_leases_without_timestamps(all_leases,
852 s0.debug_get_leases())
854 # now cancel them all
855 ss.remote_cancel_lease("si1", secrets(0)[2])
856 ss.remote_cancel_lease("si1", secrets(1)[2])
857 ss.remote_cancel_lease("si1", secrets(2)[2])
858 ss.remote_cancel_lease("si1", secrets(3)[2])
860 # the slot should still be there
861 remaining_shares = read("si1", [], [(0,10)])
862 self.failUnlessEqual(len(remaining_shares), 1)
863 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
865 ss.remote_cancel_lease("si1", secrets(4)[2])
866 # now the slot should be gone
867 no_shares = read("si1", [], [(0,10)])
868 self.failUnlessEqual(no_shares, {})