2 from twisted.trial import unittest
4 from twisted.internet import defer
5 import time, os.path, stat
7 from allmydata import interfaces
8 from allmydata.util import fileutil, hashutil
9 from allmydata.storage import BucketWriter, BucketReader, \
10 WriteBucketProxy, ReadBucketProxy, StorageServer, MutableShareFile, \
12 from allmydata.interfaces import BadWriteEnablerError
13 from allmydata.test.common import LoggingServiceParent
16 def notifyOnDisconnect(self, *args, **kwargs):
18 def dontNotifyOnDisconnect(self, marker):
21 class Bucket(unittest.TestCase):
22 def make_workdir(self, name):
23 basedir = os.path.join("storage", "Bucket", name)
24 incoming = os.path.join(basedir, "tmp", "bucket")
25 final = os.path.join(basedir, "bucket")
26 fileutil.make_dirs(basedir)
27 fileutil.make_dirs(os.path.join(basedir, "tmp"))
28 return incoming, final
30 def bucket_writer_closed(self, bw, consumed):
35 renew_secret = os.urandom(32)
36 cancel_secret = os.urandom(32)
37 expiration_time = time.time() + 5000
38 return (owner_num, renew_secret, cancel_secret, expiration_time)
40 def test_create(self):
41 incoming, final = self.make_workdir("test_create")
42 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
44 bw.remote_write(0, "a"*25)
45 bw.remote_write(25, "b"*25)
46 bw.remote_write(50, "c"*25)
47 bw.remote_write(75, "d"*7)
50 def test_readwrite(self):
51 incoming, final = self.make_workdir("test_readwrite")
52 bw = BucketWriter(self, incoming, final, 200, self.make_lease(),
54 bw.remote_write(0, "a"*25)
55 bw.remote_write(25, "b"*25)
56 bw.remote_write(50, "c"*7) # last block may be short
60 br = BucketReader(bw.finalhome)
61 self.failUnlessEqual(br.remote_read(0, 25), "a"*25)
62 self.failUnlessEqual(br.remote_read(25, 25), "b"*25)
63 self.failUnlessEqual(br.remote_read(50, 7), "c"*7)
67 def callRemote(self, methname, *args, **kwargs):
69 meth = getattr(self.target, "remote_" + methname)
70 return meth(*args, **kwargs)
71 return defer.maybeDeferred(_call)
73 class BucketProxy(unittest.TestCase):
74 def make_bucket(self, name, size):
75 basedir = os.path.join("storage", "BucketProxy", name)
76 incoming = os.path.join(basedir, "tmp", "bucket")
77 final = os.path.join(basedir, "bucket")
78 fileutil.make_dirs(basedir)
79 fileutil.make_dirs(os.path.join(basedir, "tmp"))
80 bw = BucketWriter(self, incoming, final, size, self.make_lease(),
88 renew_secret = os.urandom(32)
89 cancel_secret = os.urandom(32)
90 expiration_time = time.time() + 5000
91 return (owner_num, renew_secret, cancel_secret, expiration_time)
93 def bucket_writer_closed(self, bw, consumed):
96 def test_create(self):
97 bw, rb, sharefname = self.make_bucket("test_create", 500)
98 bp = WriteBucketProxy(rb,
103 uri_extension_size=500, nodeid=None)
104 self.failUnless(interfaces.IStorageBucketWriter.providedBy(bp))
106 def test_readwrite(self):
107 # Let's pretend each share has 100 bytes of data, and that there are
108 # 4 segments (25 bytes each), and 8 shares total. So the three
109 # per-segment merkle trees (plaintext_hash_tree, crypttext_hash_tree,
110 # block_hashes) will have 4 leaves and 7 nodes each. The per-share
111 # merkle tree (share_hashes) has 8 leaves and 15 nodes, and we need 3
112 # nodes. Furthermore, let's assume the uri_extension is 500 bytes
113 # long. That should make the whole share:
115 # 0x24 + 100 + 7*32 + 7*32 + 7*32 + 3*(2+32) + 4+500 = 1414 bytes long
117 plaintext_hashes = [hashutil.tagged_hash("plain", "bar%d" % i)
119 crypttext_hashes = [hashutil.tagged_hash("crypt", "bar%d" % i)
121 block_hashes = [hashutil.tagged_hash("block", "bar%d" % i)
123 share_hashes = [(i, hashutil.tagged_hash("share", "bar%d" % i))
125 uri_extension = "s" + "E"*498 + "e"
127 bw, rb, sharefname = self.make_bucket("test_readwrite", 1414)
128 bp = WriteBucketProxy(rb,
133 uri_extension_size=len(uri_extension),
137 d.addCallback(lambda res: bp.put_block(0, "a"*25))
138 d.addCallback(lambda res: bp.put_block(1, "b"*25))
139 d.addCallback(lambda res: bp.put_block(2, "c"*25))
140 d.addCallback(lambda res: bp.put_block(3, "d"*20))
141 d.addCallback(lambda res: bp.put_plaintext_hashes(plaintext_hashes))
142 d.addCallback(lambda res: bp.put_crypttext_hashes(crypttext_hashes))
143 d.addCallback(lambda res: bp.put_block_hashes(block_hashes))
144 d.addCallback(lambda res: bp.put_share_hashes(share_hashes))
145 d.addCallback(lambda res: bp.put_uri_extension(uri_extension))
146 d.addCallback(lambda res: bp.close())
148 # now read everything back
149 def _start_reading(res):
150 br = BucketReader(sharefname)
153 rbp = ReadBucketProxy(rb)
154 self.failUnless(interfaces.IStorageBucketReader.providedBy(rbp))
156 d1 = rbp.startIfNecessary()
157 d1.addCallback(lambda res: rbp.get_block(0))
158 d1.addCallback(lambda res: self.failUnlessEqual(res, "a"*25))
159 d1.addCallback(lambda res: rbp.get_block(1))
160 d1.addCallback(lambda res: self.failUnlessEqual(res, "b"*25))
161 d1.addCallback(lambda res: rbp.get_block(2))
162 d1.addCallback(lambda res: self.failUnlessEqual(res, "c"*25))
163 d1.addCallback(lambda res: rbp.get_block(3))
164 d1.addCallback(lambda res: self.failUnlessEqual(res, "d"*20))
166 d1.addCallback(lambda res: rbp.get_plaintext_hashes())
167 d1.addCallback(lambda res:
168 self.failUnlessEqual(res, plaintext_hashes))
169 d1.addCallback(lambda res: rbp.get_crypttext_hashes())
170 d1.addCallback(lambda res:
171 self.failUnlessEqual(res, crypttext_hashes))
172 d1.addCallback(lambda res: rbp.get_block_hashes())
173 d1.addCallback(lambda res: self.failUnlessEqual(res, block_hashes))
174 d1.addCallback(lambda res: rbp.get_share_hashes())
175 d1.addCallback(lambda res: self.failUnlessEqual(res, share_hashes))
176 d1.addCallback(lambda res: rbp.get_uri_extension())
177 d1.addCallback(lambda res:
178 self.failUnlessEqual(res, uri_extension))
182 d.addCallback(_start_reading)
188 class Server(unittest.TestCase):
191 self.sparent = LoggingServiceParent()
192 self._lease_secret = itertools.count()
194 return self.sparent.stopService()
196 def workdir(self, name):
197 basedir = os.path.join("storage", "Server", name)
200 def create(self, name, sizelimit=None):
201 workdir = self.workdir(name)
202 ss = StorageServer(workdir, sizelimit)
203 ss.setServiceParent(self.sparent)
206 def test_create(self):
207 ss = self.create("test_create")
209 def allocate(self, ss, storage_index, sharenums, size):
210 renew_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
211 cancel_secret = hashutil.tagged_hash("blah", "%d" % self._lease_secret.next())
212 return ss.remote_allocate_buckets(storage_index,
213 renew_secret, cancel_secret,
214 sharenums, size, FakeCanary())
216 def test_dont_overfill_dirs(self):
218 This test asserts that if you add a second share whose storage index
219 share lots of leading bits with an extant share (but isn't the exact
220 same storage index), this won't add an entry to the share directory.
222 ss = self.create("test_dont_overfill_dirs")
223 already, writers = self.allocate(ss, "storageindex", [0], 10)
224 for i, wb in writers.items():
225 wb.remote_write(0, "%10d" % i)
227 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
229 children_of_storedir = set(os.listdir(storedir))
231 # Now store another one under another storageindex that has leading
232 # chars the same as the first storageindex.
233 already, writers = self.allocate(ss, "storageindey", [0], 10)
234 for i, wb in writers.items():
235 wb.remote_write(0, "%10d" % i)
237 storedir = os.path.join(self.workdir("test_dont_overfill_dirs"),
239 new_children_of_storedir = set(os.listdir(storedir))
240 self.failUnlessEqual(children_of_storedir, new_children_of_storedir)
242 def test_remove_incoming(self):
243 ss = self.create("test_remove_incoming")
244 already, writers = self.allocate(ss, "vid", range(3), 10)
245 for i,wb in writers.items():
246 wb.remote_write(0, "%10d" % i)
248 incomingdir = os.path.dirname(os.path.dirname(os.path.dirname(wb.incominghome)))
249 self.failIf(os.path.exists(incomingdir))
251 def test_allocate(self):
252 ss = self.create("test_allocate")
254 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
256 canary = FakeCanary()
257 already,writers = self.allocate(ss, "vid", [0,1,2], 75)
258 self.failUnlessEqual(already, set())
259 self.failUnlessEqual(set(writers.keys()), set([0,1,2]))
261 # while the buckets are open, they should not count as readable
262 self.failUnlessEqual(ss.remote_get_buckets("vid"), {})
265 for i,wb in writers.items():
266 wb.remote_write(0, "%25d" % i)
269 # now they should be readable
270 b = ss.remote_get_buckets("vid")
271 self.failUnlessEqual(set(b.keys()), set([0,1,2]))
272 self.failUnlessEqual(b[0].remote_read(0, 25), "%25d" % 0)
274 # now if we about writing again, the server should offer those three
275 # buckets as already present. It should offer them even if we don't
276 # ask about those specific ones.
277 already,writers = self.allocate(ss, "vid", [2,3,4], 75)
278 self.failUnlessEqual(already, set([0,1,2]))
279 self.failUnlessEqual(set(writers.keys()), set([3,4]))
281 # while those two buckets are open for writing, the server should
282 # refuse to offer them to uploaders
284 already,writers = self.allocate(ss, "vid", [2,3,4,5], 75)
285 self.failUnlessEqual(already, set([0,1,2]))
286 self.failUnlessEqual(set(writers.keys()), set([5]))
288 def test_sizelimits(self):
289 ss = self.create("test_sizelimits", 5000)
290 canary = FakeCanary()
291 # a newly created and filled share incurs this much overhead, beyond
292 # the size we request.
294 LEASE_SIZE = 4+32+32+4
296 already,writers = self.allocate(ss, "vid1", [0,1,2], 1000)
297 self.failUnlessEqual(len(writers), 3)
298 # now the StorageServer should have 3000 bytes provisionally
299 # allocated, allowing only 2000 more to be claimed
300 self.failUnlessEqual(len(ss._active_writers), 3)
302 # allocating 1001-byte shares only leaves room for one
303 already2,writers2 = self.allocate(ss, "vid2", [0,1,2], 1001)
304 self.failUnlessEqual(len(writers2), 1)
305 self.failUnlessEqual(len(ss._active_writers), 4)
307 # we abandon the first set, so their provisional allocation should be
311 self.failUnlessEqual(len(ss._active_writers), 1)
312 # now we have a provisional allocation of 1001 bytes
314 # and we close the second set, so their provisional allocation should
315 # become real, long-term allocation, and grows to include the
317 for bw in writers2.values():
318 bw.remote_write(0, "a"*25)
323 self.failUnlessEqual(len(ss._active_writers), 0)
325 allocated = 1001 + OVERHEAD + LEASE_SIZE
326 # now there should be ALLOCATED=1001+12+72=1085 bytes allocated, and
327 # 5000-1085=3915 free, therefore we can fit 39 100byte shares
328 already3,writers3 = self.allocate(ss,"vid3", range(100), 100)
329 self.failUnlessEqual(len(writers3), 39)
330 self.failUnlessEqual(len(ss._active_writers), 39)
334 self.failUnlessEqual(len(ss._active_writers), 0)
335 ss.disownServiceParent()
338 # creating a new StorageServer in the same directory should see the
341 # metadata that goes into the share file is counted upon share close,
342 # as well as at startup. metadata that goes into other files will not
343 # be counted until the next startup, so if we were creating any
344 # extra-file metadata, the allocation would be more than 'allocated'
345 # and this test would need to be changed.
346 ss = self.create("test_sizelimits", 5000)
347 already4,writers4 = self.allocate(ss, "vid4", range(100), 100)
348 self.failUnlessEqual(len(writers4), 39)
349 self.failUnlessEqual(len(ss._active_writers), 39)
352 basedir = self.workdir("test_seek_behavior")
353 fileutil.make_dirs(basedir)
354 filename = os.path.join(basedir, "testfile")
355 f = open(filename, "wb")
358 # mode="w" allows seeking-to-create-holes, but truncates pre-existing
359 # files. mode="a" preserves previous contents but does not allow
360 # seeking-to-create-holes. mode="r+" allows both.
361 f = open(filename, "rb+")
365 filelen = os.stat(filename)[stat.ST_SIZE]
366 self.failUnlessEqual(filelen, 100+3)
367 f2 = open(filename, "rb")
368 self.failUnlessEqual(f2.read(5), "start")
371 def test_leases(self):
372 ss = self.create("test_leases")
373 canary = FakeCanary()
377 rs0,cs0 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
378 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
379 already,writers = ss.remote_allocate_buckets("si0", rs0, cs0,
380 sharenums, size, canary)
381 self.failUnlessEqual(len(already), 0)
382 self.failUnlessEqual(len(writers), 5)
383 for wb in writers.values():
386 leases = list(ss.get_leases("si0"))
387 self.failUnlessEqual(len(leases), 1)
388 self.failUnlessEqual(set([l[1] for l in leases]), set([rs0]))
390 rs1,cs1 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
391 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
392 already,writers = ss.remote_allocate_buckets("si1", rs1, cs1,
393 sharenums, size, canary)
394 for wb in writers.values():
397 # take out a second lease on si1
398 rs2,cs2 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
399 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
400 already,writers = ss.remote_allocate_buckets("si1", rs2, cs2,
401 sharenums, size, canary)
402 self.failUnlessEqual(len(already), 5)
403 self.failUnlessEqual(len(writers), 0)
405 leases = list(ss.get_leases("si1"))
406 self.failUnlessEqual(len(leases), 2)
407 self.failUnlessEqual(set([l[1] for l in leases]), set([rs1, rs2]))
409 # check that si0 is readable
410 readers = ss.remote_get_buckets("si0")
411 self.failUnlessEqual(len(readers), 5)
413 # renew the first lease. Only the proper renew_secret should work
414 ss.remote_renew_lease("si0", rs0)
415 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", cs0)
416 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs1)
418 # check that si0 is still readable
419 readers = ss.remote_get_buckets("si0")
420 self.failUnlessEqual(len(readers), 5)
423 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", rs0)
424 self.failUnlessRaises(IndexError, ss.remote_cancel_lease, "si0", cs1)
425 ss.remote_cancel_lease("si0", cs0)
427 # si0 should now be gone
428 readers = ss.remote_get_buckets("si0")
429 self.failUnlessEqual(len(readers), 0)
430 # and the renew should no longer work
431 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si0", rs0)
434 # cancel the first lease on si1, leaving the second in place
435 ss.remote_cancel_lease("si1", cs1)
436 readers = ss.remote_get_buckets("si1")
437 self.failUnlessEqual(len(readers), 5)
438 # the corresponding renew should no longer work
439 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
441 leases = list(ss.get_leases("si1"))
442 self.failUnlessEqual(len(leases), 1)
443 self.failUnlessEqual(set([l[1] for l in leases]), set([rs2]))
445 ss.remote_renew_lease("si1", rs2)
446 # cancelling the second should make it go away
447 ss.remote_cancel_lease("si1", cs2)
448 readers = ss.remote_get_buckets("si1")
449 self.failUnlessEqual(len(readers), 0)
450 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs1)
451 self.failUnlessRaises(IndexError, ss.remote_renew_lease, "si1", rs2)
453 leases = list(ss.get_leases("si1"))
454 self.failUnlessEqual(len(leases), 0)
457 # test overlapping uploads
458 rs3,cs3 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
459 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
460 rs4,cs4 = (hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()),
461 hashutil.tagged_hash("blah", "%d" % self._lease_secret.next()))
462 already,writers = ss.remote_allocate_buckets("si3", rs3, cs3,
463 sharenums, size, canary)
464 self.failUnlessEqual(len(already), 0)
465 self.failUnlessEqual(len(writers), 5)
466 already2,writers2 = ss.remote_allocate_buckets("si3", rs4, cs4,
467 sharenums, size, canary)
468 self.failUnlessEqual(len(already2), 0)
469 self.failUnlessEqual(len(writers2), 0)
470 for wb in writers.values():
473 leases = list(ss.get_leases("si3"))
474 self.failUnlessEqual(len(leases), 1)
476 already3,writers3 = ss.remote_allocate_buckets("si3", rs4, cs4,
477 sharenums, size, canary)
478 self.failUnlessEqual(len(already3), 5)
479 self.failUnlessEqual(len(writers3), 0)
481 leases = list(ss.get_leases("si3"))
482 self.failUnlessEqual(len(leases), 2)
486 class MutableServer(unittest.TestCase):
489 self.sparent = LoggingServiceParent()
490 self._lease_secret = itertools.count()
492 return self.sparent.stopService()
494 def workdir(self, name):
495 basedir = os.path.join("storage", "MutableServer", name)
498 def create(self, name, sizelimit=None):
499 workdir = self.workdir(name)
500 ss = StorageServer(workdir, sizelimit)
501 ss.setServiceParent(self.sparent)
502 ss.setNodeID("\x00" * 32)
505 def test_create(self):
506 ss = self.create("test_create")
508 def write_enabler(self, we_tag):
509 return hashutil.tagged_hash("we_blah", we_tag)
511 def renew_secret(self, tag):
512 return hashutil.tagged_hash("renew_blah", str(tag))
514 def cancel_secret(self, tag):
515 return hashutil.tagged_hash("cancel_blah", str(tag))
517 def allocate(self, ss, storage_index, we_tag, lease_tag, sharenums, size):
518 write_enabler = self.write_enabler(we_tag)
519 renew_secret = self.renew_secret(lease_tag)
520 cancel_secret = self.cancel_secret(lease_tag)
521 rstaraw = ss.remote_slot_testv_and_readv_and_writev
522 testandwritev = dict( [ (shnum, ([], [], None) )
523 for shnum in sharenums ] )
525 rc = rstaraw(storage_index,
526 (write_enabler, renew_secret, cancel_secret),
529 (did_write, readv_data) = rc
530 self.failUnless(did_write)
531 self.failUnless(isinstance(readv_data, dict))
532 self.failUnlessEqual(len(readv_data), 0)
534 def test_allocate(self):
535 ss = self.create("test_allocate")
536 self.allocate(ss, "si1", "we1", self._lease_secret.next(),
539 read = ss.remote_slot_readv
540 self.failUnlessEqual(read("si1", [0], [(0, 10)]),
542 self.failUnlessEqual(read("si1", [], [(0, 10)]),
543 {0: [""], 1: [""], 2: [""]})
544 self.failUnlessEqual(read("si1", [0], [(100, 10)]),
548 secrets = ( self.write_enabler("we1"),
549 self.renew_secret("we1"),
550 self.cancel_secret("we1") )
551 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
552 write = ss.remote_slot_testv_and_readv_and_writev
553 answer = write("si1", secrets,
554 {0: ([], [(0,data)], None)},
556 self.failUnlessEqual(answer, (True, {0:[],1:[],2:[]}) )
558 self.failUnlessEqual(read("si1", [0], [(0,20)]),
559 {0: ["00000000001111111111"]})
560 self.failUnlessEqual(read("si1", [0], [(95,10)]),
562 #self.failUnlessEqual(s0.remote_get_length(), 100)
564 bad_secrets = ("bad write enabler", secrets[1], secrets[2])
565 f = self.failUnlessRaises(BadWriteEnablerError,
566 write, "si1", bad_secrets,
568 self.failUnless("The write enabler was recorded by nodeid 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa'." in f, f)
570 # this testv should fail
571 answer = write("si1", secrets,
572 {0: ([(0, 12, "eq", "444444444444"),
573 (20, 5, "eq", "22222"),
580 self.failUnlessEqual(answer, (False,
581 {0: ["000000000011", "22222"],
585 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
588 answer = write("si1", secrets,
589 {0: ([(10, 5, "lt", "11111"),
596 self.failUnlessEqual(answer, (False,
601 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
604 def test_operators(self):
605 # test operators, the data we're comparing is '11111' in all cases.
606 # test both fail+pass, reset data after each one.
607 ss = self.create("test_operators")
609 secrets = ( self.write_enabler("we1"),
610 self.renew_secret("we1"),
611 self.cancel_secret("we1") )
612 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
613 write = ss.remote_slot_testv_and_readv_and_writev
614 read = ss.remote_slot_readv
617 write("si1", secrets,
618 {0: ([], [(0,data)], None)},
624 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11110"),
629 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
630 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
631 self.failUnlessEqual(read("si1", [], [(0,100)]), {0: [data]})
634 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11111"),
639 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
640 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
643 answer = write("si1", secrets, {0: ([(10, 5, "lt", "11112"),
648 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
649 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
653 answer = write("si1", secrets, {0: ([(10, 5, "le", "11110"),
658 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
659 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
662 answer = write("si1", secrets, {0: ([(10, 5, "le", "11111"),
667 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
668 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
671 answer = write("si1", secrets, {0: ([(10, 5, "le", "11112"),
676 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
677 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
681 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11112"),
686 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
687 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
690 answer = write("si1", secrets, {0: ([(10, 5, "eq", "11111"),
695 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
696 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
700 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11111"),
705 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
706 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
709 answer = write("si1", secrets, {0: ([(10, 5, "ne", "11112"),
714 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
715 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
719 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11110"),
724 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
725 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
728 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11111"),
733 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
734 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
737 answer = write("si1", secrets, {0: ([(10, 5, "ge", "11112"),
742 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
743 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
747 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11110"),
752 self.failUnlessEqual(answer, (True, {0: ["11111"]}))
753 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: ["y"*100]})
756 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11111"),
761 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
762 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
765 answer = write("si1", secrets, {0: ([(10, 5, "gt", "11112"),
770 self.failUnlessEqual(answer, (False, {0: ["11111"]}))
771 self.failUnlessEqual(read("si1", [0], [(0,100)]), {0: [data]})
774 def test_readv(self):
775 ss = self.create("test_readv")
776 secrets = ( self.write_enabler("we1"),
777 self.renew_secret("we1"),
778 self.cancel_secret("we1") )
779 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
780 write = ss.remote_slot_testv_and_readv_and_writev
781 read = ss.remote_slot_readv
782 data = [("%d" % i) * 100 for i in range(3)]
783 rc = write("si1", secrets,
784 {0: ([], [(0,data[0])], None),
785 1: ([], [(0,data[1])], None),
786 2: ([], [(0,data[2])], None),
788 self.failUnlessEqual(rc, (True, {}))
790 answer = read("si1", [], [(0, 10)])
791 self.failUnlessEqual(answer, {0: ["0"*10],
795 def compare_leases_without_timestamps(self, a, b):
796 self.failUnlessEqual(len(a), len(b))
797 for i in range(len(a)):
798 (num_a, (ownerid_a, expiration_time_a,
799 renew_secret_a, cancel_secret_a, nodeid_a)) = a[i]
800 (num_b, (ownerid_b, expiration_time_b,
801 renew_secret_b, cancel_secret_b, nodeid_b)) = b[i]
802 self.failUnlessEqual( (num_a, ownerid_a, renew_secret_a,
803 cancel_secret_a, nodeid_a),
804 (num_b, ownerid_b, renew_secret_b,
805 cancel_secret_b, nodeid_b) )
807 def test_leases(self):
808 ss = self.create("test_leases")
810 return ( self.write_enabler("we1"),
811 self.renew_secret("we1-%d" % n),
812 self.cancel_secret("we1-%d" % n) )
813 data = "".join([ ("%d" % i) * 10 for i in range(10) ])
814 write = ss.remote_slot_testv_and_readv_and_writev
815 read = ss.remote_slot_readv
816 rc = write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
817 self.failUnlessEqual(rc, (True, {}))
819 # create a random non-numeric file in the bucket directory, to
820 # exercise the code that's supposed to ignore those.
821 bucket_dir = os.path.join(self.workdir("test_leases"),
822 "shares", storage_index_to_dir("si1"))
823 f = open(os.path.join(bucket_dir, "ignore_me.txt"), "w")
824 f.write("you ought to be ignoring me\n")
827 # re-allocate the slots and use the same secrets, that should update
829 write("si1", secrets(0), {0: ([], [(0,data)], None)}, [])
832 ss.remote_renew_lease("si1", secrets(0)[1])
834 # now allocate them with a bunch of different secrets, to trigger the
835 # extended lease code
836 write("si1", secrets(1), {0: ([], [(0,data)], None)}, [])
837 write("si1", secrets(2), {0: ([], [(0,data)], None)}, [])
838 write("si1", secrets(3), {0: ([], [(0,data)], None)}, [])
839 write("si1", secrets(4), {0: ([], [(0,data)], None)}, [])
840 write("si1", secrets(5), {0: ([], [(0,data)], None)}, [])
843 ss.remote_cancel_lease("si1", secrets(5)[2])
845 s0 = MutableShareFile(os.path.join(bucket_dir, "0"))
846 all_leases = s0.debug_get_leases()
847 self.failUnlessEqual(len(all_leases), 5)
849 # and write enough data to expand the container, forcing the server
851 write("si1", secrets(0),
852 {0: ([], [(0,data)], 200), },
855 # read back the leases, make sure they're still intact.
856 self.compare_leases_without_timestamps(all_leases,
857 s0.debug_get_leases())
859 ss.remote_renew_lease("si1", secrets(0)[1])
860 ss.remote_renew_lease("si1", secrets(1)[1])
861 ss.remote_renew_lease("si1", secrets(2)[1])
862 ss.remote_renew_lease("si1", secrets(3)[1])
863 ss.remote_renew_lease("si1", secrets(4)[1])
864 self.compare_leases_without_timestamps(all_leases,
865 s0.debug_get_leases())
866 # get a new copy of the leases, with the current timestamps. Reading
867 # data and failing to renew/cancel leases should leave the timestamps
869 all_leases = s0.debug_get_leases()
870 # renewing with a bogus token should prompt an error message
872 # TODO: examine the exception thus raised, make sure the old nodeid
873 # is present, to provide for share migration
874 self.failUnlessRaises(IndexError,
875 ss.remote_renew_lease, "si1",
877 # same for cancelling
878 self.failUnlessRaises(IndexError,
879 ss.remote_cancel_lease, "si1",
881 self.failUnlessEqual(all_leases, s0.debug_get_leases())
883 # reading shares should not modify the timestamp
884 read("si1", [], [(0,200)])
885 self.failUnlessEqual(all_leases, s0.debug_get_leases())
887 write("si1", secrets(0),
888 {0: ([], [(200, "make me bigger")], None)}, [])
889 self.compare_leases_without_timestamps(all_leases,
890 s0.debug_get_leases())
892 write("si1", secrets(0),
893 {0: ([], [(500, "make me really bigger")], None)}, [])
894 self.compare_leases_without_timestamps(all_leases,
895 s0.debug_get_leases())
897 # now cancel them all
898 ss.remote_cancel_lease("si1", secrets(0)[2])
899 ss.remote_cancel_lease("si1", secrets(1)[2])
900 ss.remote_cancel_lease("si1", secrets(2)[2])
901 ss.remote_cancel_lease("si1", secrets(3)[2])
903 # the slot should still be there
904 remaining_shares = read("si1", [], [(0,10)])
905 self.failUnlessEqual(len(remaining_shares), 1)
906 self.failUnlessEqual(len(s0.debug_get_leases()), 1)
908 ss.remote_cancel_lease("si1", secrets(4)[2])
909 # now the slot should be gone
910 no_shares = read("si1", [], [(0,10)])
911 self.failUnlessEqual(no_shares, {})