import os, re, base64
from cStringIO import StringIO
+
from twisted.trial import unittest
from twisted.internet import defer, reactor
+
from allmydata import uri, client
from allmydata.nodemaker import NodeMaker
from allmydata.util import base32, consumer, fileutil, mathutil
+from allmydata.util.fileutil import abspath_expanduser_unicode
from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
ssk_pubkey_fingerprint_hash
from allmydata.util.consumer import MemoryConsumer
from allmydata.monitor import Monitor
from allmydata.test.common import ShouldFailMixin
from allmydata.test.no_network import GridTestMixin
-from foolscap.api import eventually, fireEventually
+from foolscap.api import eventually, fireEventually, flushEventualQueue
from foolscap.logging import log
from allmydata.storage_client import StorageFarmBroker
from allmydata.storage.common import storage_index_to_dir
s = FakeStorage()
peerids = [tagged_hash("peerid", "%d" % i)[:20]
for i in range(num_peers)]
- storage_broker = StorageFarmBroker(None, True)
+ storage_broker = StorageFarmBroker(None, True, 0, None)
for peerid in peerids:
fss = FakeStorageServer(peerid, s)
ann = {"anonymous-storage-FURL": "pb://%s@nowhere/fake" % base32.b2a(peerid),
dumped = servermap.dump(StringIO())
self.failUnlessIn("3-of-10", dumped.getvalue())
d.addCallback(_then)
- # Now overwrite the contents with some new contents. We want
+ # Now overwrite the contents with some new contents. We want
# to make them big enough to force the file to be uploaded
# in more than one segment.
big_contents = "contents1" * 100000 # about 900 KiB
# before, they need to be big enough to force multiple
# segments, so that we make the downloader deal with
# multiple segments.
- bigger_contents = "contents2" * 1000000 # about 9MiB
+ bigger_contents = "contents2" * 1000000 # about 9MiB
bigger_contents_uploadable = MutableData(bigger_contents)
d.addCallback(lambda ignored:
n.overwrite(bigger_contents_uploadable))
d.addCallback(_created)
return d
- def publish_mdmf(self):
+ def publish_mdmf(self, data=None):
# like publish_one, except that the result is guaranteed to be
# an MDMF file.
# self.CONTENTS should have more than one segment.
- self.CONTENTS = "This is an MDMF file" * 100000
+ if data is None:
+ data = "This is an MDMF file" * 100000
+ self.CONTENTS = data
self.uploadable = MutableData(self.CONTENTS)
self._storage = FakeStorage()
self._nodemaker = make_nodemaker(self._storage)
return d
- def publish_sdmf(self):
+ def publish_sdmf(self, data=None):
# like publish_one, except that the result is guaranteed to be
# an SDMF file
- self.CONTENTS = "This is an SDMF file" * 1000
+ if data is None:
+ data = "This is an SDMF file" * 1000
+ self.CONTENTS = data
self.uploadable = MutableData(self.CONTENTS)
self._storage = FakeStorage()
self._nodemaker = make_nodemaker(self._storage)
d.addCallback(_created)
return d
- def publish_empty_sdmf(self):
- self.CONTENTS = ""
- self.uploadable = MutableData(self.CONTENTS)
- self._storage = FakeStorage()
- self._nodemaker = make_nodemaker(self._storage, keysize=None)
- self._storage_broker = self._nodemaker.storage_broker
- d = self._nodemaker.create_mutable_file(self.uploadable,
- version=SDMF_VERSION)
- def _created(node):
- self._fn = node
- self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
- d.addCallback(_created)
- return d
-
def publish_multiple(self, version=0):
self.CONTENTS = ["Contents 0",
d.addCallback(_remove_shares)
return d
+ def test_all_but_two_shares_vanished_updated_servermap(self):
+ # tests error reporting for ticket #1742
+ d = self.make_servermap()
+ def _remove_shares(servermap):
+ self._version = servermap.best_recoverable_version()
+ for shares in self._storage._peers.values()[2:]:
+ shares.clear()
+ return self.make_servermap(servermap)
+ d.addCallback(_remove_shares)
+ def _check(updated_servermap):
+ d1 = self.shouldFail(NotEnoughSharesError,
+ "test_all_but_two_shares_vanished_updated_servermap",
+ "ran out of servers",
+ self.do_download, updated_servermap, version=self._version)
+ return d1
+ d.addCallback(_check)
+ return d
+
def test_no_servers(self):
sb2 = make_storagebroker(num_peers=0)
# if there are no servers, then a MODE_READ servermap should come
def test_corrupt_all_encprivkey_late(self):
- # this should work for the same reason as above, but we corrupt
+ # this should work for the same reason as above, but we corrupt
# after the servermap update to exercise the error handling
# code.
# We need to remove the privkey from the node, or the retrieve
"test_verify_mdmf_bad_encprivkey_uncheckable")
return d
+ def test_verify_sdmf_empty(self):
+ d = self.publish_sdmf("")
+ d.addCallback(lambda ignored: self._fn.check(Monitor(), verify=True))
+ d.addCallback(self.check_good, "test_verify_sdmf")
+ d.addCallback(flushEventualQueue)
+ return d
+
+ def test_verify_mdmf_empty(self):
+ d = self.publish_mdmf("")
+ d.addCallback(lambda ignored: self._fn.check(Monitor(), verify=True))
+ d.addCallback(self.check_good, "test_verify_mdmf")
+ d.addCallback(flushEventualQueue)
+ return d
class Repair(unittest.TestCase, PublishMixin, ShouldFailMixin):
# In the buggy version, the check that precedes the retrieve+publish
# cycle uses MODE_READ, instead of MODE_REPAIR, and fails to get the
# privkey that repair needs.
- d = self.publish_empty_sdmf()
+ d = self.publish_sdmf("")
def _delete_one_share(ign):
shares = self._storage._peers
for peerid in shares:
self.c = self.g.clients[0]
self.nm = self.c.nodemaker
self.data = "test data" * 100000 # about 900 KiB; MDMF
- self.small_data = "test data" * 10 # about 90 B; SDMF
+ self.small_data = "test data" * 10 # 90 B; SDMF
- def do_upload_mdmf(self):
- d = self.nm.create_mutable_file(MutableData(self.data),
+ def do_upload_mdmf(self, data=None):
+ if data is None:
+ data = self.data
+ d = self.nm.create_mutable_file(MutableData(data),
version=MDMF_VERSION)
def _then(n):
assert isinstance(n, MutableFileNode)
d.addCallback(_then)
return d
- def do_upload_sdmf(self):
- d = self.nm.create_mutable_file(MutableData(self.small_data))
+ def do_upload_sdmf(self, data=None):
+ if data is None:
+ data = self.small_data
+ d = self.nm.create_mutable_file(MutableData(data))
def _then(n):
assert isinstance(n, MutableFileNode)
assert n._protocol_version == SDMF_VERSION
fso = debug.FindSharesOptions()
storage_index = base32.b2a(n.get_storage_index())
fso.si_s = storage_index
- fso.nodedirs = [unicode(os.path.dirname(os.path.abspath(storedir)))
+ fso.nodedirs = [os.path.dirname(abspath_expanduser_unicode(unicode(storedir)))
for (i,ss,storedir)
in self.iterate_servers()]
fso.stdout = StringIO()
return d
- def test_partial_read(self):
- d = self.do_upload_mdmf()
- d.addCallback(lambda ign: self.mdmf_node.get_best_readable_version())
- modes = [("start_on_segment_boundary",
- mathutil.next_multiple(128 * 1024, 3), 50),
- ("ending_one_byte_after_segment_boundary",
- mathutil.next_multiple(128 * 1024, 3)-50, 51),
- ("zero_length_at_start", 0, 0),
- ("zero_length_in_middle", 50, 0),
- ("zero_length_at_segment_boundary",
- mathutil.next_multiple(128 * 1024, 3), 0),
- ]
+ def _test_partial_read(self, node, expected, modes, step):
+ d = node.get_best_readable_version()
for (name, offset, length) in modes:
- d.addCallback(self._do_partial_read, name, offset, length)
- # then read only a few bytes at a time, and see that the results are
- # what we expect.
+ d.addCallback(self._do_partial_read, name, expected, offset, length)
+ # then read the whole thing, but only a few bytes at a time, and see
+ # that the results are what we expect.
def _read_data(version):
c = consumer.MemoryConsumer()
d2 = defer.succeed(None)
- for i in xrange(0, len(self.data), 10000):
- d2.addCallback(lambda ignored, i=i: version.read(c, i, 10000))
+ for i in xrange(0, len(expected), step):
+ d2.addCallback(lambda ignored, i=i: version.read(c, i, step))
d2.addCallback(lambda ignored:
- self.failUnlessEqual(self.data, "".join(c.chunks)))
+ self.failUnlessEqual(expected, "".join(c.chunks)))
return d2
d.addCallback(_read_data)
return d
- def _do_partial_read(self, version, name, offset, length):
+
+ def _do_partial_read(self, version, name, expected, offset, length):
c = consumer.MemoryConsumer()
d = version.read(c, offset, length)
- expected = self.data[offset:offset+length]
+ if length is None:
+ expected_range = expected[offset:]
+ else:
+ expected_range = expected[offset:offset+length]
d.addCallback(lambda ignored: "".join(c.chunks))
def _check(results):
- if results != expected:
- print
+ if results != expected_range:
+ print "read([%d]+%s) got %d bytes, not %d" % \
+ (offset, length, len(results), len(expected_range))
print "got: %s ... %s" % (results[:20], results[-20:])
- print "exp: %s ... %s" % (expected[:20], expected[-20:])
- self.fail("results[%s] != expected" % name)
+ print "exp: %s ... %s" % (expected_range[:20], expected_range[-20:])
+ self.fail("results[%s] != expected_range" % name)
return version # daisy-chained to next call
d.addCallback(_check)
return d
+ def test_partial_read_mdmf_0(self):
+ data = ""
+ d = self.do_upload_mdmf(data=data)
+ modes = [("all1", 0,0),
+ ("all2", 0,None),
+ ]
+ d.addCallback(self._test_partial_read, data, modes, 1)
+ return d
+
+ def test_partial_read_mdmf_large(self):
+ segment_boundary = mathutil.next_multiple(128 * 1024, 3)
+ modes = [("start_on_segment_boundary", segment_boundary, 50),
+ ("ending_one_byte_after_segment_boundary", segment_boundary-50, 51),
+ ("zero_length_at_start", 0, 0),
+ ("zero_length_in_middle", 50, 0),
+ ("zero_length_at_segment_boundary", segment_boundary, 0),
+ ("complete_file1", 0, len(self.data)),
+ ("complete_file2", 0, None),
+ ]
+ d = self.do_upload_mdmf()
+ d.addCallback(self._test_partial_read, self.data, modes, 10000)
+ return d
+
+ def test_partial_read_sdmf_0(self):
+ data = ""
+ modes = [("all1", 0,0),
+ ("all2", 0,None),
+ ]
+ d = self.do_upload_sdmf(data=data)
+ d.addCallback(self._test_partial_read, data, modes, 1)
+ return d
+
+ def test_partial_read_sdmf_2(self):
+ data = "hi"
+ modes = [("one_byte", 0, 1),
+ ("last_byte", 1, 1),
+ ("last_byte2", 1, None),
+ ("complete_file", 0, 2),
+ ("complete_file2", 0, None),
+ ]
+ d = self.do_upload_sdmf(data=data)
+ d.addCallback(self._test_partial_read, data, modes, 1)
+ return d
+
+ def test_partial_read_sdmf_90(self):
+ modes = [("start_at_middle", 50, 40),
+ ("start_at_middle2", 50, None),
+ ("zero_length_at_start", 0, 0),
+ ("zero_length_in_middle", 50, 0),
+ ("zero_length_at_end", 90, 0),
+ ("complete_file1", 0, None),
+ ("complete_file2", 0, 90),
+ ]
+ d = self.do_upload_sdmf()
+ d.addCallback(self._test_partial_read, self.small_data, modes, 10)
+ return d
+
+ def test_partial_read_sdmf_100(self):
+ data = "test data "*10
+ modes = [("start_at_middle", 50, 50),
+ ("start_at_middle2", 50, None),
+ ("zero_length_at_start", 0, 0),
+ ("zero_length_in_middle", 50, 0),
+ ("complete_file1", 0, 100),
+ ("complete_file2", 0, None),
+ ]
+ d = self.do_upload_sdmf(data=data)
+ d.addCallback(self._test_partial_read, data, modes, 10)
+ return d
+
def _test_read_and_download(self, node, expected):
d = node.get_best_readable_version()
def _read_data(version):
c = consumer.MemoryConsumer()
+ c2 = consumer.MemoryConsumer()
d2 = defer.succeed(None)
d2.addCallback(lambda ignored: version.read(c))
d2.addCallback(lambda ignored:
self.failUnlessEqual(expected, "".join(c.chunks)))
+
+ d2.addCallback(lambda ignored: version.read(c2, offset=0,
+ size=len(expected)))
+ d2.addCallback(lambda ignored:
+ self.failUnlessEqual(expected, "".join(c2.chunks)))
return d2
d.addCallback(_read_data)
d.addCallback(lambda ignored: node.download_best_version())
def setUp(self):
GridTestMixin.setUp(self)
self.basedir = self.mktemp()
- self.set_up_grid()
+ self.set_up_grid(num_servers=13)
self.c = self.g.clients[0]
self.nm = self.c.nodemaker
self.data = "testdata " * 100000 # about 900 KiB; MDMF
- self.small_data = "test data" * 10 # about 90 B; SDMF
+ self.small_data = "test data" * 10 # 90 B; SDMF
def do_upload_sdmf(self):