-import os, stat, time, weakref
+import os, stat, time
from allmydata.interfaces import RIStorageServer
from allmydata import node
from zope.interface import implements
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
from twisted.application.internet import TimerService
from foolscap.api import Referenceable
from pycryptopp.publickey import rsa
from allmydata import storage_client
from allmydata.immutable.upload import Uploader
from allmydata.immutable.download import Downloader
-from allmydata.immutable.filenode import FileNode, LiteralFileNode
from allmydata.immutable.offloaded import Helper
from allmydata.control import ControlServer
from allmydata.introducer.client import IntroducerClient
from allmydata.util import hashutil, base32, pollmixin, cachedir, log
from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.util.time_format import parse_duration, parse_date
-from allmydata.uri import LiteralFileURI, UnknownURI
-from allmydata.dirnode import DirectoryNode
-from allmydata.mutable.filenode import MutableFileNode
-from allmydata.unknown import UnknownNode
from allmydata.stats import StatsProvider
from allmydata.history import History
-from allmydata.interfaces import IURI, IDirectoryURI, IStatsProducer, \
- IReadonlyDirectoryURI, IFileURI, IMutableFileURI, RIStubClient, \
- UnhandledCapTypeError
+from allmydata.interfaces import IStatsProducer, RIStubClient
+from allmydata.nodemaker import NodeMaker
+
KiB=1024
MiB=1024*KiB
def _make_secret():
return base32.b2a(os.urandom(hashutil.CRYPTO_VAL_SIZE)) + "\n"
+class SecretHolder:
+ def __init__(self, lease_secret):
+ self._lease_secret = lease_secret
+
+ def get_renewal_secret(self):
+ return hashutil.my_renewal_secret_hash(self._lease_secret)
+
+ def get_cancel_secret(self):
+ return hashutil.my_cancel_secret_hash(self._lease_secret)
+
+class KeyGenerator:
+ def __init__(self):
+ self._remote = None
+ self.default_keysize = 2048
+
+ def set_remote_generator(self, keygen):
+ self._remote = keygen
+ def set_default_keysize(self, keysize):
+ """Call this to override the size of the RSA keys created for new
+ mutable files. The default of None means to let mutable.filenode
+ choose its own size, which means 2048 bits."""
+ self.default_keysize = keysize
+
+ def generate(self, keysize=None):
+ keysize = keysize or self.default_keysize
+ if self._remote:
+ d = self._remote.callRemote('get_rsa_key_pair', keysize)
+ def make_key_objs((verifying_key, signing_key)):
+ v = rsa.create_verifying_key_from_string(verifying_key)
+ s = rsa.create_signing_key_from_string(signing_key)
+ return v, s
+ d.addCallback(make_key_objs)
+ return d
+ else:
+ # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
+ # secs
+ signer = rsa.generate(keysize)
+ verifier = signer.get_verifying_key()
+ return defer.succeed( (verifier, signer) )
+
+
class Client(node.Node, pollmixin.PollMixin):
implements(IStatsProducer)
"max_segment_size": 128*KiB,
}
- # set this to override the size of the RSA keys created for new mutable
- # files. The default of None means to let mutable.filenode choose its own
- # size, which means 2048 bits.
- DEFAULT_MUTABLE_KEYSIZE = None
-
def __init__(self, basedir="."):
node.Node.__init__(self, basedir)
self.started_timestamp = time.time()
self.init_control()
if self.get_config("helper", "enabled", False, boolean=True):
self.init_helper()
- self.init_client()
- self._key_generator = None
+ self._key_generator = KeyGenerator()
key_gen_furl = self.get_config("client", "key_generator.furl", None)
if key_gen_furl:
self.init_key_gen(key_gen_furl)
+ self.init_client()
# ControlServer and Helper are attached after Tub startup
self.init_ftp_server()
self.init_sftp_server()
def init_lease_secret(self):
secret_s = self.get_or_create_private_config("secret", _make_secret)
- self._lease_secret = base32.a2b(secret_s)
+ lease_secret = base32.a2b(secret_s)
+ self._secret_holder = SecretHolder(lease_secret)
def init_storage(self):
# should we run a storage server (and publish it for others to use)?
DEP["happy"] = int(self.get_config("client", "shares.happy", DEP["happy"]))
convergence_s = self.get_or_create_private_config('convergence', _make_secret)
self.convergence = base32.a2b(convergence_s)
- self._node_cache = weakref.WeakValueDictionary() # uri -> node
self.init_client_storage_broker()
- self.add_service(History(self.stats_provider))
+ self.history = self.add_service(History(self.stats_provider))
self.add_service(Uploader(helper_furl, self.stats_provider))
download_cachedir = os.path.join(self.basedir,
"private", "cache", "download")
self.download_cache_dirman.setServiceParent(self)
self.add_service(Downloader(self.stats_provider))
self.init_stub_client()
+ self.init_nodemaker()
def init_client_storage_broker(self):
# create a StorageFarmBroker object, for use by Uploader/Downloader
d.addErrback(log.err, facility="tahoe.init",
level=log.BAD, umid="OEHq3g")
+ def init_nodemaker(self):
+ self.nodemaker = NodeMaker(self.storage_broker,
+ self._secret_holder,
+ self.get_history(),
+ self.getServiceNamed("uploader"),
+ self.getServiceNamed("downloader"),
+ self.download_cache_dirman,
+ self.get_encoding_parameters(),
+ self._key_generator)
+
def get_history(self):
return self.getServiceNamed("history")
def init_helper(self):
d = self.when_tub_ready()
def _publish(self):
- h = Helper(os.path.join(self.basedir, "helper"), self.stats_provider)
+ h = Helper(os.path.join(self.basedir, "helper"),
+ self.stats_provider, self.history)
h.setServiceParent(self)
# TODO: this is confusing. BASEDIR/private/helper.furl is created
# by the helper. BASEDIR/helper.furl is consumed by the client
level=log.BAD, umid="z9DMzw")
def _got_key_generator(self, key_generator):
- self._key_generator = key_generator
+ self._key_generator.set_remote_generator(key_generator)
key_generator.notifyOnDisconnect(self._lost_key_generator)
def _lost_key_generator(self):
- self._key_generator = None
+ self._key_generator.set_remote_generator(None)
+
+ def set_default_mutable_keysize(self, keysize):
+ self._key_generator.set_default_keysize(keysize)
def init_web(self, webport):
self.log("init_web(webport=%s)", args=(webport,))
return self.introducer_client.connected_to_introducer()
return False
- def get_renewal_secret(self):
- return hashutil.my_renewal_secret_hash(self._lease_secret)
+ def get_renewal_secret(self): # this will go away
+ return self._secret_holder.get_renewal_secret()
def get_cancel_secret(self):
- return hashutil.my_cancel_secret_hash(self._lease_secret)
+ return self._secret_holder.get_cancel_secret()
def debug_wait_for_client_connections(self, num_clients):
"""Return a Deferred that fires (with None) when we have connections
def create_node_from_uri(self, writecap, readcap=None):
# this returns synchronously.
- u = writecap or readcap
- if not u:
- # maybe the writecap was hidden because we're in a readonly
- # directory, and the future cap format doesn't have a readcap, or
- # something.
- return UnknownNode(writecap, readcap)
- u = IURI(u)
- if isinstance(u, UnknownURI):
- return UnknownNode(writecap, readcap)
- u_s = u.to_string()
- if u_s not in self._node_cache:
- if IReadonlyDirectoryURI.providedBy(u):
- # read-only dirnodes
- node = DirectoryNode(self).init_from_uri(u)
- elif IDirectoryURI.providedBy(u):
- # dirnodes
- node = DirectoryNode(self).init_from_uri(u)
- elif IFileURI.providedBy(u):
- if isinstance(u, LiteralFileURI):
- node = LiteralFileNode(u, self) # LIT
- else:
- node = FileNode(u, self, self.download_cache_dirman) # CHK
- elif IMutableFileURI.providedBy(u):
- node = MutableFileNode(self).init_from_uri(u)
- else:
- raise UnhandledCapTypeError("cap is recognized, but has no Node")
- self._node_cache[u_s] = node # note: WeakValueDictionary
- return self._node_cache[u_s]
+ return self.nodemaker.create_from_cap(writecap, readcap)
def create_empty_dirnode(self):
- d = self.create_mutable_file()
- d.addCallback(DirectoryNode.create_with_mutablefile, self)
- return d
+ return self.nodemaker.create_new_mutable_directory()
def create_mutable_file(self, contents="", keysize=None):
- keysize = keysize or self.DEFAULT_MUTABLE_KEYSIZE
- n = MutableFileNode(self)
- d = n.create(contents, self._generate_pubprivkeys, keysize=keysize)
- d.addCallback(lambda res: n)
- return d
-
- def _generate_pubprivkeys(self, key_size):
- if self._key_generator:
- d = self._key_generator.callRemote('get_rsa_key_pair', key_size)
- def make_key_objs((verifying_key, signing_key)):
- v = rsa.create_verifying_key_from_string(verifying_key)
- s = rsa.create_signing_key_from_string(signing_key)
- return v, s
- d.addCallback(make_key_objs)
- return d
- else:
- # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
- # secs
- signer = rsa.generate(key_size)
- verifier = signer.get_verifying_key()
- return verifier, signer
+ return self.nodemaker.create_mutable_file(contents, keysize)
def upload(self, uploadable):
uploader = self.getServiceNamed("uploader")
return uploader.upload(uploadable, history=self.get_history())
-
-
- def list_all_upload_statuses(self):
- return self.get_history().list_all_upload_statuses()
-
- def list_all_download_statuses(self):
- return self.get_history().list_all_download_statuses()
-
- def list_all_mapupdate_statuses(self):
- return self.get_history().list_all_mapupdate_statuses()
- def list_all_publish_statuses(self):
- return self.get_history().list_all_publish_statuses()
- def list_all_retrieve_statuses(self):
- return self.get_history().list_all_retrieve_statuses()
-
- def list_all_helper_statuses(self):
- try:
- helper = self.getServiceNamed("helper")
- except KeyError:
- return []
- return helper.get_all_upload_statuses()
from allmydata.mutable.filenode import MutableFileNode
from allmydata.unknown import UnknownNode
from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\
- IURI, IFileNode, IMutableFileURI, IFilesystemNode, \
+ IFileNode, IMutableFileURI, IFilesystemNode, \
ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable, \
CannotPackUnknownNodeError
from allmydata.check_results import DeepCheckResults, \
from allmydata.util import hashutil, mathutil, base32, log
from allmydata.util.assertutil import _assert, precondition
from allmydata.util.netstring import netstring, split_netstring
-from allmydata.uri import DirectoryURI, LiteralFileURI, from_string
+from allmydata.uri import DirectoryURI, ReadonlyDirectoryURI, \
+ LiteralFileURI, from_string
from pycryptopp.cipher.aes import AES
class CachingDict(dict):
implements(IDirectoryNode, ICheckable, IDeepCheckable)
filenode_class = MutableFileNode
- def __init__(self, client):
- self._client = client
+ def __init__(self, filenode, nodemaker, uploader):
+ self._node = filenode
+ filenode_uri = IMutableFileURI(filenode.get_uri())
+ if filenode_uri.is_readonly():
+ self._uri = ReadonlyDirectoryURI(filenode_uri)
+ else:
+ self._uri = DirectoryURI(filenode_uri)
+ self._nodemaker = nodemaker
+ self._uploader = uploader
self._most_recent_size = None
def __repr__(self):
return "<%s %s %s>" % (self.__class__.__name__, self.is_readonly() and "RO" or "RW", hasattr(self, '_uri') and self._uri.abbrev())
- def init_from_uri(self, myuri):
- self._uri = IURI(myuri)
- self._node = self.filenode_class(self._client)
- self._node.init_from_uri(self._uri.get_filenode_uri())
- return self
-
- @classmethod
- def create_with_mutablefile(cls, filenode, client):
- self = cls(client)
- self._node = filenode
- return self._filenode_created(filenode)
-
- def create(self, keypair_generator=None, keysize=None):
- """
- Returns a deferred that eventually fires with self once the directory
- has been created (distributed across a set of storage servers).
- """
- # first we create a MutableFileNode with empty_contents, then use its
- # URI to create our own.
- self._node = self.filenode_class(self._client)
- empty_contents = self._pack_contents(CachingDict())
- d = self._node.create(empty_contents, keypair_generator, keysize=keysize)
- d.addCallback(self._filenode_created)
- return d
- def _filenode_created(self, res):
- self._uri = DirectoryURI(IMutableFileURI(self._node.get_uri()))
- return self
def get_size(self):
# return the size of our backing mutable file, in bytes, if we've
return plaintext
def _create_node(self, rwcap, rocap):
- return self._client.create_node_from_uri(rwcap, rocap)
+ return self._nodemaker.create_from_cap(rwcap, rocap)
def _unpack_contents(self, data):
# the directory is serialized as a list of netstrings, one per child.
assert len(e) == 3
name, child_uri, metadata = e
assert isinstance(name, unicode)
+ assert isinstance(child_uri, str)
child_node = self._create_node(child_uri, None)
if isinstance(child_node, UnknownNode):
msg = "cannot pack unknown node as child %s" % str(name)
assert isinstance(name, unicode)
if self.is_readonly():
return defer.fail(NotMutableError())
- d = self._client.upload(uploadable)
+ d = self._uploader.upload(uploadable)
d.addCallback(lambda results: results.uri)
- d.addCallback(self._client.create_node_from_uri)
+ d.addCallback(self._nodemaker.create_from_cap)
d.addCallback(lambda node:
self.set_node(name, node, metadata, overwrite))
return d
assert isinstance(name, unicode)
if self.is_readonly():
return defer.fail(NotMutableError())
- d = self._client.create_empty_dirnode()
+ d = self._nodemaker.create_new_mutable_directory()
def _created(child):
entries = [(name, child, None)]
a = Adder(self, entries, overwrite=overwrite)
self.all_retrieve_status = weakref.WeakKeyDictionary()
self.recent_retrieve_status = []
+ self.all_helper_upload_statuses = weakref.WeakKeyDictionary()
+ self.recent_helper_upload_statuses = []
+
def add_download(self, download_status):
self.all_downloads_statuses[download_status] = None
for s in self.all_retrieve_status:
yield s
+ def notify_helper_upload(self, s):
+ self.all_helper_upload_statuses[s] = None
+ self.recent_helper_upload_statuses.append(s)
+ while len(self.recent_helper_upload_statuses) > self.MAX_UPLOAD_STATUSES:
+ self.recent_helper_upload_statuses.pop(0)
+ def list_all_helper_statuses(self):
+ for s in self.all_helper_upload_statuses:
+ yield s
object that was passed into my constructor whether this task has been
cancelled (by invoking its raise_if_cancelled() method).
"""
- def __init__(self, client, verifycap, servers, verify, add_lease, monitor):
+ def __init__(self, verifycap, servers, verify, add_lease, secret_holder,
+ monitor):
assert precondition(isinstance(verifycap, CHKFileVerifierURI), verifycap, type(verifycap))
assert precondition(isinstance(servers, (set, frozenset)), servers)
for (serverid, serverrref) in servers:
prefix = "%s" % base32.b2a_l(verifycap.storage_index[:8], 60)
log.PrefixingLogMixin.__init__(self, facility="tahoe.immutable.checker", prefix=prefix)
- self._client = client
self._verifycap = verifycap
self._monitor = monitor
self._share_hash_tree = None
- frs = file_renewal_secret_hash(client.get_renewal_secret(),
+ frs = file_renewal_secret_hash(secret_holder.get_renewal_secret(),
self._verifycap.storage_index)
self.file_renewal_secret = frs
- fcs = file_cancel_secret_hash(client.get_cancel_secret(),
+ fcs = file_cancel_secret_hash(secret_holder.get_cancel_secret(),
self._verifycap.storage_index)
self.file_cancel_secret = fcs
from twisted.internet.interfaces import IPushProducer, IConsumer
from twisted.protocols import basic
from foolscap.api import eventually
-from allmydata.interfaces import IFileNode, IFileURI, ICheckable, \
+from allmydata.interfaces import IFileNode, ICheckable, \
IDownloadTarget, IUploadResults
from allmydata.util import dictutil, log, base32
-from allmydata.util.assertutil import precondition
from allmydata import uri as urimodule
from allmydata.immutable.checker import Checker
from allmydata.check_results import CheckResults, CheckAndRepairResults
class _ImmutableFileNodeBase(object):
implements(IFileNode, ICheckable)
- def __init__(self, uri, client):
- precondition(urimodule.IImmutableFileURI.providedBy(uri), uri)
- self.u = IFileURI(uri)
- self._client = client
-
def get_readonly_uri(self):
return self.get_uri()
class DownloadCache:
implements(IDownloadTarget)
- def __init__(self, node, cachedirectorymanager):
- self._downloader = node._client.getServiceNamed("downloader")
- self._uri = node.get_uri()
- self._storage_index = node.get_storage_index()
+ def __init__(self, filecap, storage_index, downloader,
+ cachedirectorymanager):
+ self._downloader = downloader
+ self._uri = filecap
+ self._storage_index = storage_index
self.milestones = set() # of (offset,size,Deferred)
self.cachedirectorymanager = cachedirectorymanager
self.cachefile = None
pass
def finish(self):
return None
- # The following methods are just because the target might be a repairer.DownUpConnector,
- # and just because the current CHKUpload object expects to find the storage index and
- # encoding parameters in its Uploadable.
+ # The following methods are just because the target might be a
+ # repairer.DownUpConnector, and just because the current CHKUpload object
+ # expects to find the storage index and encoding parameters in its
+ # Uploadable.
def set_storageindex(self, storageindex):
pass
def set_encodingparams(self, encodingparams):
class FileNode(_ImmutableFileNodeBase, log.PrefixingLogMixin):
- def __init__(self, uri, client, cachedirectorymanager):
- _ImmutableFileNodeBase.__init__(self, uri, client)
- self.download_cache = DownloadCache(self, cachedirectorymanager)
- prefix = uri.get_verify_cap().to_string()
+ def __init__(self, filecap, storage_broker, secret_holder,
+ downloader, history, cachedirectorymanager):
+ assert isinstance(filecap, str)
+ self.u = urimodule.CHKFileURI.init_from_string(filecap)
+ self._storage_broker = storage_broker
+ self._secret_holder = secret_holder
+ self._downloader = downloader
+ self._history = history
+ storage_index = self.get_storage_index()
+ self.download_cache = DownloadCache(filecap, storage_index, downloader,
+ cachedirectorymanager)
+ prefix = self.u.get_verify_cap().to_string()
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.filenode", prefix=prefix)
self.log("starting", level=log.OPERATIONAL)
def check_and_repair(self, monitor, verify=False, add_lease=False):
verifycap = self.get_verify_cap()
- sb = self._client.get_storage_broker()
+ sb = self._storage_broker
servers = sb.get_all_servers()
+ sh = self._secret_holder
- c = Checker(client=self._client, verifycap=verifycap, servers=servers,
- verify=verify, add_lease=add_lease, monitor=monitor)
+ c = Checker(verifycap=verifycap, servers=servers,
+ verify=verify, add_lease=add_lease, secret_holder=sh,
+ monitor=monitor)
d = c.start()
def _maybe_repair(cr):
crr = CheckAndRepairResults(self.u.storage_index)
crr.repair_successful = False
crr.repair_failure = f
return f
- r = Repairer(client=self._client, verifycap=verifycap, monitor=monitor)
+ r = Repairer(storage_broker=sb, secret_holder=sh,
+ verifycap=verifycap, monitor=monitor)
d = r.start()
d.addCallbacks(_gather_repair_results, _repair_error)
return d
def check(self, monitor, verify=False, add_lease=False):
verifycap = self.get_verify_cap()
- sb = self._client.get_storage_broker()
+ sb = self._storage_broker
servers = sb.get_all_servers()
+ sh = self._secret_holder
- v = Checker(client=self._client, verifycap=verifycap, servers=servers,
- verify=verify, add_lease=add_lease, monitor=monitor)
+ v = Checker(verifycap=verifycap, servers=servers,
+ verify=verify, add_lease=add_lease, secret_holder=sh,
+ monitor=monitor)
return v.start()
def read(self, consumer, offset=0, size=None):
return d
def download(self, target):
- downloader = self._client.getServiceNamed("downloader")
- history = self._client.get_history()
- return downloader.download(self.get_uri(), target, self._parentmsgid,
- history=history)
+ return self._downloader.download(self.get_uri(), target,
+ self._parentmsgid,
+ history=self._history)
def download_to_data(self):
- downloader = self._client.getServiceNamed("downloader")
- history = self._client.get_history()
- return downloader.download_to_data(self.get_uri(), history=history)
+ return self._downloader.download_to_data(self.get_uri(),
+ history=self._history)
class LiteralProducer:
implements(IPushProducer)
class LiteralFileNode(_ImmutableFileNodeBase):
- def __init__(self, uri, client):
- precondition(urimodule.IImmutableFileURI.providedBy(uri), uri)
- _ImmutableFileNodeBase.__init__(self, uri, client)
+ def __init__(self, filecap):
+ assert isinstance(filecap, str)
+ self.u = urimodule.LiteralFileURI.init_from_string(filecap)
def get_uri(self):
return self.u.to_string()
self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
parent=log_number)
- self._client = helper.parent
+ client = helper.parent
+ self._storage_broker = client.get_storage_broker()
+ self._secret_holder = client._secret_holder
self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
self._log_number)
self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
chk_upload_helper_class = CHKUploadHelper
MAX_UPLOAD_STATUSES = 10
- def __init__(self, basedir, stats_provider=None):
+ def __init__(self, basedir, stats_provider=None, history=None):
self._basedir = basedir
self._chk_incoming = os.path.join(basedir, "CHK_incoming")
self._chk_encoding = os.path.join(basedir, "CHK_encoding")
fileutil.make_dirs(self._chk_encoding)
self._active_uploads = {}
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
- self._all_upload_statuses = weakref.WeakKeyDictionary()
- self._recent_upload_statuses = []
self.stats_provider = stats_provider
if stats_provider:
stats_provider.register_producer(self)
"chk_upload_helper.fetched_bytes": 0,
"chk_upload_helper.encoded_bytes": 0,
}
+ self._history = history
service.MultiService.__init__(self)
def setServiceParent(self, parent):
def _add_upload(self, uh):
self._all_uploads[uh] = None
- s = uh.get_upload_status()
- self._all_upload_statuses[s] = None
- self._recent_upload_statuses.append(s)
- while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
- self._recent_upload_statuses.pop(0)
+ if self._history:
+ s = uh.get_upload_status()
+ self._history.notify_helper_upload(s)
def upload_finished(self, storage_index, size):
# this is called with size=0 if the upload failed
del self._active_uploads[storage_index]
s = uh.get_upload_status()
s.set_active(False)
-
- def get_all_upload_statuses(self):
- return self._all_upload_statuses
cancelled (by invoking its raise_if_cancelled() method).
"""
- def __init__(self, client, verifycap, monitor):
+ def __init__(self, storage_broker, secret_holder, verifycap, monitor):
assert precondition(isinstance(verifycap, CHKFileVerifierURI))
logprefix = si_b2a(verifycap.storage_index)[:5]
log.PrefixingLogMixin.__init__(self, "allmydata.immutable.repairer",
prefix=logprefix)
- self._client = client
+ self._storage_broker = storage_broker
+ self._secret_holder = secret_holder
self._verifycap = verifycap
self._monitor = monitor
def start(self):
self.log("starting repair")
duc = DownUpConnector()
- sb = self._client.get_storage_broker()
- dl = download.CiphertextDownloader(sb, self._verifycap, target=duc,
+ dl = download.CiphertextDownloader(self._storage_broker,
+ self._verifycap, target=duc,
monitor=self._monitor)
- ul = upload.CHKUploader(self._client)
+ ul = upload.CHKUploader(self._storage_broker, self._secret_holder)
d = defer.Deferred()
def __repr__(self):
return "<Tahoe2PeerSelector for upload %s>" % self.upload_id
- def get_shareholders(self, client,
+ def get_shareholders(self, storage_broker, secret_holder,
storage_index, share_size, block_size,
num_segments, total_shares, shares_of_happiness):
"""
self.use_peers = set() # PeerTrackers that have shares assigned to them
self.preexisting_shares = {} # sharenum -> peerid holding the share
- sb = client.get_storage_broker()
- peers = sb.get_servers_for_index(storage_index)
+ peers = storage_broker.get_servers_for_index(storage_index)
if not peers:
raise NoServersError("client gave us zero peers")
raise NoServersError("no peers could accept an allocated_size of %d" % allocated_size)
# decide upon the renewal/cancel secrets, to include them in the
- # allocat_buckets query.
- client_renewal_secret = client.get_renewal_secret()
- client_cancel_secret = client.get_cancel_secret()
+ # allocate_buckets query.
+ client_renewal_secret = secret_holder.get_renewal_secret()
+ client_cancel_secret = secret_holder.get_cancel_secret()
file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
storage_index)
class CHKUploader:
peer_selector_class = Tahoe2PeerSelector
- def __init__(self, client):
- self._client = client
- self._log_number = self._client.log("CHKUploader starting")
+ def __init__(self, storage_broker, secret_holder):
+ # peer_selector needs storage_broker and secret_holder
+ self._storage_broker = storage_broker
+ self._secret_holder = secret_holder
+ self._log_number = self.log("CHKUploader starting", parent=None)
self._encoder = None
self._results = UploadResults()
self._storage_index = None
kwargs["parent"] = self._log_number
if "facility" not in kwargs:
kwargs["facility"] = "tahoe.upload"
- return self._client.log(*args, **kwargs)
+ return log.msg(*args, **kwargs)
def start(self, encrypted_uploadable):
"""Start uploading the file.
def locate_all_shareholders(self, encoder, started):
peer_selection_started = now = time.time()
self._storage_index_elapsed = now - started
+ storage_broker = self._storage_broker
+ secret_holder = self._secret_holder
storage_index = encoder.get_param("storage_index")
self._storage_index = storage_index
upload_id = si_b2a(storage_index)[:5]
k,desired,n = encoder.get_param("share_counts")
self._peer_selection_started = time.time()
- d = peer_selector.get_shareholders(self._client, storage_index,
+ d = peer_selector.get_shareholders(storage_broker, secret_holder,
+ storage_index,
share_size, block_size,
num_segments, n, desired)
def _done(res):
class LiteralUploader:
- def __init__(self, client):
- self._client = client
+ def __init__(self):
self._results = UploadResults()
self._status = s = UploadStatus()
s.set_storage_index(None)
self.stats_provider.count('uploader.bytes_uploaded', size)
if size <= self.URI_LIT_SIZE_THRESHOLD:
- uploader = LiteralUploader(self.parent)
+ uploader = LiteralUploader()
return uploader.start(uploadable)
else:
eu = EncryptAnUploadable(uploadable, self._parentmsgid)
d2.addCallback(lambda x: eu.get_storage_index())
d2.addCallback(lambda si: uploader.start(eu, si))
else:
- uploader = CHKUploader(self.parent)
+ storage_broker = self.parent.get_storage_broker()
+ secret_holder = self.parent._secret_holder
+ uploader = CHKUploader(storage_broker, secret_holder)
d2.addCallback(lambda x: uploader.start(eu))
self._all_uploads[uploader] = None
class IFilesystemNode(Interface):
def get_uri():
"""
- Return the URI that can be used by others to get access to this
- node. If this node is read-only, the URI will only offer read-only
- access. If this node is read-write, the URI will offer read-write
- access.
+ Return the URI string that can be used by others to get access to
+ this node. If this node is read-only, the URI will only offer
+ read-only access. If this node is read-write, the URI will offer
+ read-write access.
If you have read-write access to a node and wish to share merely
read-only access with others, use get_readonly_uri().
"""
def get_readonly_uri():
- """Return the directory URI that can be used by others to get
- read-only access to this directory node. The result is a read-only
- URI, regardless of whether this dirnode is read-only or read-write.
+ """Return the URI string that can be used by others to get read-only
+ access to this node. The result is a read-only URI, regardless of
+ whether this node is read-only or read-write.
- If you have merely read-only access to this dirnode,
- get_readonly_uri() will return the same thing as get_uri().
+ If you have merely read-only access to this node, get_readonly_uri()
+ will return the same thing as get_uri().
"""
def get_repair_cap():
class MutableChecker:
- def __init__(self, node, monitor):
+ def __init__(self, node, storage_broker, history, monitor):
self._node = node
+ self._storage_broker = storage_broker
+ self._history = history
self._monitor = monitor
self.bad_shares = [] # list of (nodeid,shnum,failure)
self._storage_index = self._node.get_storage_index()
def check(self, verify=False, add_lease=False):
servermap = ServerMap()
- u = ServermapUpdater(self._node, self._monitor, servermap, MODE_CHECK,
- add_lease=add_lease)
- history = self._node._client.get_history()
- if history:
- history.notify_mapupdate(u.get_status())
+ u = ServermapUpdater(self._node, self._storage_broker, self._monitor,
+ servermap, MODE_CHECK, add_lease=add_lease)
+ if self._history:
+ self._history.notify_mapupdate(u.get_status())
d = u.update()
d.addCallback(self._got_mapupdate_results)
if verify:
class MutableCheckAndRepairer(MutableChecker):
- def __init__(self, node, monitor):
- MutableChecker.__init__(self, node, monitor)
+ def __init__(self, node, storage_broker, history, monitor):
+ MutableChecker.__init__(self, node, storage_broker, history, monitor)
self.cr_results = CheckAndRepairResults(self._storage_index)
self.cr_results.pre_repair_results = self.results
self.need_repair = False
ICheckable, ICheckResults, NotEnoughSharesError
from allmydata.util import hashutil, log
from allmydata.util.assertutil import precondition
-from allmydata.uri import WriteableSSKFileURI
+from allmydata.uri import WriteableSSKFileURI, ReadonlySSKFileURI
from allmydata.monitor import Monitor
-from pycryptopp.publickey import rsa
from pycryptopp.cipher.aes import AES
from publish import Publish
reactor.callLater(self._delay, d.callback, None)
return d
-# use client.create_mutable_file() to make one of these
+# use nodemaker.create_mutable_file() to make one of these
class MutableFileNode:
implements(IMutableFileNode, ICheckable)
- SIGNATURE_KEY_SIZE = 2048
- checker_class = MutableChecker
- check_and_repairer_class = MutableCheckAndRepairer
- def __init__(self, client):
- self._client = client
+ def __init__(self, storage_broker, secret_holder,
+ default_encoding_parameters, history):
+ self._storage_broker = storage_broker
+ self._secret_holder = secret_holder
+ self._default_encoding_parameters = default_encoding_parameters
+ self._history = history
self._pubkey = None # filled in upon first read
self._privkey = None # filled in if we're mutable
# we keep track of the last encoding parameters that we use. These
# are updated upon retrieve, and used by publish. If we publish
# without ever reading (i.e. overwrite()), then we use these values.
- defaults = client.get_encoding_parameters()
- self._required_shares = defaults["k"]
- self._total_shares = defaults["n"]
+ self._required_shares = default_encoding_parameters["k"]
+ self._total_shares = default_encoding_parameters["n"]
self._sharemap = {} # known shares, shnum-to-[nodeids]
self._cache = ResponseCache()
else:
return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None)
- def init_from_uri(self, myuri):
+ def init_from_uri(self, filecap):
# we have the URI, but we have not yet retrieved the public
# verification key, nor things like 'k' or 'N'. If and when someone
# wants to get our contents, we'll pull from shares and fill those
# in.
- self._uri = IMutableFileURI(myuri)
- if not self._uri.is_readonly():
+ assert isinstance(filecap, str)
+ if filecap.startswith("URI:SSK:"):
+ self._uri = WriteableSSKFileURI.init_from_string(filecap)
self._writekey = self._uri.writekey
else:
+ assert filecap.startswith("URI:SSK-RO:")
+ self._uri = ReadonlySSKFileURI.init_from_string(filecap)
self._writekey = None
self._readkey = self._uri.readkey
self._storage_index = self._uri.storage_index
self._encprivkey = None
return self
- def create(self, initial_contents, keypair_generator=None, keysize=None):
- """Call this when the filenode is first created. This will generate
- the keys, generate the initial shares, wait until at least numpeers
- are connected, allocate shares, and upload the initial
- contents. Returns a Deferred that fires (with the MutableFileNode
- instance you should use) when it completes.
+ def create_with_keys(self, (pubkey, privkey), initial_contents):
+ """Call this to create a brand-new mutable file. It will create the
+ shares, find homes for them, and upload the initial contents. Returns
+ a Deferred that fires (with the MutableFileNode instance you should
+ use) when it completes.
"""
- keysize = keysize or self.SIGNATURE_KEY_SIZE
- d = defer.maybeDeferred(self._generate_pubprivkeys,
- keypair_generator, keysize)
- d.addCallback(self._generated)
- d.addCallback(lambda res: self._upload(initial_contents, None))
- return d
-
- def _generated(self, (pubkey, privkey) ):
self._pubkey, self._privkey = pubkey, privkey
pubkey_s = self._pubkey.serialize()
privkey_s = self._privkey.serialize()
self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint)
self._readkey = self._uri.readkey
self._storage_index = self._uri.storage_index
-
- def _generate_pubprivkeys(self, keypair_generator, keysize):
- if keypair_generator:
- return keypair_generator(keysize)
- else:
- # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
- # secs
- signer = rsa.generate(keysize)
- verifier = signer.get_verifying_key()
- return verifier, signer
+ return self._upload(initial_contents, None)
def _encrypt_privkey(self, writekey, privkey):
enc = AES(writekey)
return hashutil.ssk_write_enabler_hash(self._writekey, peerid)
def get_renewal_secret(self, peerid):
assert len(peerid) == 20
- crs = self._client.get_renewal_secret()
+ crs = self._secret_holder.get_renewal_secret()
frs = hashutil.file_renewal_secret_hash(crs, self._storage_index)
return hashutil.bucket_renewal_secret_hash(frs, peerid)
def get_cancel_secret(self, peerid):
assert len(peerid) == 20
- ccs = self._client.get_cancel_secret()
+ ccs = self._secret_holder.get_cancel_secret()
fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index)
return hashutil.bucket_cancel_secret_hash(fcs, peerid)
def get_readonly(self):
if self.is_readonly():
return self
- ro = MutableFileNode(self._client)
- ro.init_from_uri(self._uri.get_readonly())
+ ro = MutableFileNode(self._storage_broker, self._secret_holder,
+ self._default_encoding_parameters, self._history)
+ ro.init_from_uri(self.get_readonly_uri())
return ro
def get_readonly_uri(self):
# ICheckable
def check(self, monitor, verify=False, add_lease=False):
- checker = self.checker_class(self, monitor)
+ checker = MutableChecker(self, self._storage_broker,
+ self._history, monitor)
return checker.check(verify, add_lease)
def check_and_repair(self, monitor, verify=False, add_lease=False):
- checker = self.check_and_repairer_class(self, monitor)
+ checker = MutableCheckAndRepairer(self, self._storage_broker,
+ self._history, monitor)
return checker.check(verify, add_lease)
#################################
servermap = ServerMap()
return self._update_servermap(servermap, mode)
def _update_servermap(self, servermap, mode):
- u = ServermapUpdater(self, Monitor(), servermap, mode)
- history = self._client.get_history()
- if history:
- history.notify_mapupdate(u.get_status())
+ u = ServermapUpdater(self, self._storage_broker, Monitor(), servermap,
+ mode)
+ if self._history:
+ self._history.notify_mapupdate(u.get_status())
return u.update()
def download_version(self, servermap, version, fetch_privkey=False):
def _try_once_to_download_version(self, servermap, version,
fetch_privkey=False):
r = Retrieve(self, servermap, version, fetch_privkey)
- history = self._client.get_history()
- if history:
- history.notify_retrieve(r.get_status())
+ if self._history:
+ self._history.notify_retrieve(r.get_status())
return r.download()
def upload(self, new_contents, servermap):
return self._do_serialized(self._upload, new_contents, servermap)
def _upload(self, new_contents, servermap):
assert self._pubkey, "update_servermap must be called before publish"
- p = Publish(self, servermap)
- history = self._client.get_history()
- if history:
- history.notify_publish(p.get_status(), len(new_contents))
+ p = Publish(self, self._storage_broker, servermap)
+ if self._history:
+ self._history.notify_publish(p.get_status(), len(new_contents))
return p.publish(new_contents)
To make the initial publish, set servermap to None.
"""
- def __init__(self, filenode, servermap):
+ def __init__(self, filenode, storage_broker, servermap):
self._node = filenode
+ self._storage_broker = storage_broker
self._servermap = servermap
self._storage_index = self._node.get_storage_index()
self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
- num = self._node._client.log("Publish(%s): starting" % prefix)
+ num = self.log("Publish(%s): starting" % prefix, parent=None)
self._log_number = num
self._running = True
self._first_write_error = None
assert self._privkey
self._encprivkey = self._node.get_encprivkey()
- sb = self._node._client.get_storage_broker()
+ sb = self._storage_broker
full_peerlist = sb.get_servers_for_index(self._storage_index)
self.full_peerlist = full_peerlist # for use later, immutable
self.bad_peers = set() # peerids who have errbacked/refused requests
class ServermapUpdater:
- def __init__(self, filenode, monitor, servermap, mode=MODE_READ,
- add_lease=False):
+ def __init__(self, filenode, storage_broker, monitor, servermap,
+ mode=MODE_READ, add_lease=False):
"""I update a servermap, locating a sufficient number of useful
shares and remembering where they are located.
"""
self._node = filenode
+ self._storage_broker = storage_broker
self._monitor = monitor
self._servermap = servermap
self.mode = mode
self._queries_completed = 0
- sb = self._node._client.get_storage_broker()
- full_peerlist = sb.get_servers_for_index(self._node._storage_index)
+ sb = self._storage_broker
+ full_peerlist = sb.get_servers_for_index(self._storage_index)
self.full_peerlist = full_peerlist # for use later, immutable
self.extra_peers = full_peerlist[:] # peers are removed as we use them
self._good_peers = set() # peers who had some shares
--- /dev/null
+import weakref
+from allmydata.immutable.filenode import FileNode, LiteralFileNode
+from allmydata.mutable.filenode import MutableFileNode
+from allmydata.dirnode import DirectoryNode
+from allmydata.unknown import UnknownNode
+from allmydata.uri import DirectoryURI, ReadonlyDirectoryURI
+
+# the "node maker" is a two-argument callable (really a 'create' method on a
+# NodeMaker instance) which accepts a URI string (and an optional readcap
+# string, for use by dirnode.copy) and returns an object which (at the very
+# least) provides IFilesystemNode. That interface has other methods that can
+# be used to determine if the node represents a file or directory, in which
+# case other methods are available (like download() or modify()). Each Tahoe
+# process will typically have a single NodeMaker, but unit tests may create
+# simplified/mocked forms for test purposes.
+
+# any authorities which fsnodes will need (like a reference to the
+# StorageFarmBroker, to access storage servers for publish/retrieve/download)
+# will be retained as attributes inside the NodeMaker and passed to fsnodes
+# as necessary.
+
+class NodeMaker:
+ def __init__(self, storage_broker, secret_holder, history,
+ uploader, downloader, download_cache_dirman,
+ default_encoding_parameters, key_generator):
+ self.storage_broker = storage_broker
+ self.secret_holder = secret_holder
+ self.history = history
+ self.uploader = uploader
+ self.downloader = downloader
+ self.download_cache_dirman = download_cache_dirman
+ self.default_encoding_parameters = default_encoding_parameters
+ self.key_generator = key_generator
+
+ self._node_cache = weakref.WeakValueDictionary() # uri -> node
+
+ def _create_lit(self, cap):
+ return LiteralFileNode(cap)
+ def _create_immutable(self, cap):
+ return FileNode(cap, self.storage_broker, self.secret_holder,
+ self.downloader, self.history,
+ self.download_cache_dirman)
+ def _create_mutable(self, cap):
+ n = MutableFileNode(self.storage_broker, self.secret_holder,
+ self.default_encoding_parameters,
+ self.history)
+ return n.init_from_uri(cap)
+ def _create_dirnode(self, filenode):
+ return DirectoryNode(filenode, self, self.uploader)
+
+ def create_from_cap(self, writecap, readcap=None):
+ # this returns synchronously.
+ assert isinstance(writecap, (str, type(None))), type(writecap)
+ assert isinstance(readcap, (str, type(None))), type(readcap)
+ cap = writecap or readcap
+ if not cap:
+ # maybe the writecap was hidden because we're in a readonly
+ # directory, and the future cap format doesn't have a readcap, or
+ # something.
+ return UnknownNode(writecap, readcap)
+ if cap in self._node_cache:
+ return self._node_cache[cap]
+ elif cap.startswith("URI:LIT:"):
+ node = self._create_lit(cap)
+ elif cap.startswith("URI:CHK:"):
+ node = self._create_immutable(cap)
+ elif cap.startswith("URI:SSK-RO:") or cap.startswith("URI:SSK:"):
+ node = self._create_mutable(cap)
+ elif cap.startswith("URI:DIR2-RO:") or cap.startswith("URI:DIR2:"):
+ if cap.startswith("URI:DIR2-RO:"):
+ dircap = ReadonlyDirectoryURI.init_from_string(cap)
+ elif cap.startswith("URI:DIR2:"):
+ dircap = DirectoryURI.init_from_string(cap)
+ filecap = dircap.get_filenode_uri().to_string()
+ filenode = self.create_from_cap(filecap)
+ node = self._create_dirnode(filenode)
+ else:
+ return UnknownNode(writecap, readcap) # don't cache UnknownNode
+ self._node_cache[cap] = node # note: WeakValueDictionary
+ return node
+
+
+ def create_mutable_file(self, contents="", keysize=None):
+ n = MutableFileNode(self.storage_broker, self.secret_holder,
+ self.default_encoding_parameters, self.history)
+ d = self.key_generator.generate(keysize)
+ d.addCallback(n.create_with_keys, contents)
+ d.addCallback(lambda res: n)
+ return d
+
+ def create_new_mutable_directory(self, initial_children={}):
+ if initial_children:
+ raise NotImplementedError("initial_children= not implemented yet")
+ d = self.create_mutable_file()
+ d.addCallback(self._create_dirnode)
+ return d
self.download_cache_dirman = cachedir.CacheDirectoryManager(download_cachedir.name)
def getServiceNamed(self, name):
return None
+ def get_storage_broker(self):
+ return None
+ _secret_holder=None
+ def get_history(self):
+ return None
def get_encoding_parameters(self):
return {"k": 3, "n": 10}
def get_writekey(self):
return os.urandom(16)
+ def create_node_from_uri(self, writecap, readcap):
+ return None
class FakeMutableFileNode(mut_filenode.MutableFileNode):
- def __init__(self, client):
- mut_filenode.MutableFileNode.__init__(self, client)
+ def __init__(self, *args, **kwargs):
+ mut_filenode.MutableFileNode.__init__(self, *args, **kwargs)
self._uri = uri.WriteableSSKFileURI(randutil.insecurerandstr(16), randutil.insecurerandstr(32))
class FakeDirectoryNode(dirnode.DirectoryNode):
def __init__(self, client):
dirnode.DirectoryNode.__init__(self, client)
mutfileuri = uri.WriteableSSKFileURI(randutil.insecurerandstr(16), randutil.insecurerandstr(32))
- myuri = uri.DirectoryURI(mutfileuri)
+ myuri = uri.DirectoryURI(mutfileuri).to_string()
self.init_from_uri(myuri)
packstr = None
fakeclient = FakeClient()
testdirnode = dirnode.DirectoryNode(fakeclient)
-testdirnode.init_from_uri(uri.DirectoryURI(uri.WriteableSSKFileURI(randutil.insecurerandstr(16), randutil.insecurerandstr(32))))
+testdirnode.init_from_uri(uri.DirectoryURI(uri.WriteableSSKFileURI(randutil.insecurerandstr(16), randutil.insecurerandstr(32))).to_string())
def random_unicode(l):
while True:
def random_fsnode():
coin = random.randrange(0, 3)
if coin == 0:
- return immut_filenode.FileNode(uri.CHKFileURI(randutil.insecurerandstr(16), randutil.insecurerandstr(32), random.randrange(1, 5), random.randrange(6, 15), random.randrange(99, 1000000000000)), fakeclient, None)
+ return immut_filenode.FileNode(uri.CHKFileURI(randutil.insecurerandstr(16), randutil.insecurerandstr(32), random.randrange(1, 5), random.randrange(6, 15), random.randrange(99, 1000000000000)).to_string(), None, None, None, None, None)
elif coin == 1:
- return FakeMutableFileNode(fakeclient)
+ encoding_parameters = {"k": 3, "n": 10}
+ return FakeMutableFileNode(None, None, encoding_parameters, None)
else:
assert coin == 2
return FakeDirectoryNode(fakeclient)
def run_benchmarks(profile=False):
for (func, initfunc) in [(unpack, init_for_unpack), (pack, init_for_pack), (unpack_and_repack, init_for_unpack)]:
print "benchmarking %s" % (func,)
- benchutil.bench(unpack_and_repack, initfunc=init_for_unpack, TOPXP=12, profile=profile, profresults=PROF_FILE_NAME)
+ benchutil.bench(unpack_and_repack, initfunc=init_for_unpack, TOPXP=12)#, profile=profile, profresults=PROF_FILE_NAME)
def print_stats():
s = hotshot.stats.load(PROF_FILE_NAME)
from foolscap.api import flushEventualQueue, fireEventually
from allmydata import uri, dirnode, client
from allmydata.introducer.server import IntroducerNode
-from allmydata.interfaces import IURI, IMutableFileNode, IFileNode, \
+from allmydata.interfaces import IMutableFileNode, IFileNode, \
FileTooLargeError, NotEnoughSharesError, ICheckable
from allmydata.check_results import CheckResults, CheckAndRepairResults, \
DeepCheckResults, DeepCheckAndRepairResults
all_contents = {}
bad_shares = {}
- def __init__(self, u, thisclient):
- precondition(IURI.providedBy(u), u)
- self.client = thisclient
- self.my_uri = u
- self.storage_index = u.storage_index
+ def __init__(self, filecap):
+ precondition(isinstance(filecap, str), filecap)
+ self.my_uri = uri.CHKFileURI.init_from_string(filecap)
+ self.storage_index = self.my_uri.storage_index
def get_uri(self):
return self.my_uri.to_string()
return d
def make_chk_file_uri(size):
- return uri.CHKFileURI(key=os.urandom(16),
- uri_extension_hash=os.urandom(32),
- needed_shares=3,
- total_shares=10,
- size=size)
-
-def create_chk_filenode(thisclient, contents):
- u = make_chk_file_uri(len(contents))
- n = FakeCHKFileNode(u, thisclient)
- FakeCHKFileNode.all_contents[u.to_string()] = contents
+ u = uri.CHKFileURI(key=os.urandom(16),
+ uri_extension_hash=os.urandom(32),
+ needed_shares=3,
+ total_shares=10,
+ size=size)
+ return u.to_string()
+
+def create_chk_filenode(contents):
+ filecap = make_chk_file_uri(len(contents))
+ n = FakeCHKFileNode(filecap)
+ FakeCHKFileNode.all_contents[filecap] = contents
return n
all_contents = {}
bad_shares = {}
- def __init__(self, thisclient):
- self.client = thisclient
- self.my_uri = make_mutable_file_uri()
- self.storage_index = self.my_uri.storage_index
+ def __init__(self, storage_broker, secret_holder,
+ default_encoding_parameters, history):
+ self.init_from_uri(make_mutable_file_uri())
def create(self, initial_contents, key_generator=None, keysize=None):
if len(initial_contents) > self.MUTABLE_SIZELIMIT:
raise FileTooLargeError("SDMF is limited to one segment, and "
self.MUTABLE_SIZELIMIT))
self.all_contents[self.storage_index] = initial_contents
return defer.succeed(self)
- def init_from_uri(self, myuri):
- self.my_uri = IURI(myuri)
+ def init_from_uri(self, filecap):
+ assert isinstance(filecap, str)
+ if filecap.startswith("URI:SSK:"):
+ self.my_uri = uri.WriteableSSKFileURI.init_from_string(filecap)
+ else:
+ assert filecap.startswith("URI:SSK-RO:")
+ self.my_uri = uri.ReadonlySSKFileURI.init_from_string(filecap)
self.storage_index = self.my_uri.storage_index
return self
def get_uri(self):
def make_mutable_file_uri():
return uri.WriteableSSKFileURI(writekey=os.urandom(16),
- fingerprint=os.urandom(32))
+ fingerprint=os.urandom(32)).to_string()
def make_verifier_uri():
return uri.SSKVerifierURI(storage_index=os.urandom(16),
- fingerprint=os.urandom(32))
+ fingerprint=os.urandom(32)).to_string()
class FakeDirectoryNode(dirnode.DirectoryNode):
"""This offers IDirectoryNode, but uses a FakeMutableFileNode for the
# will have registered the helper furl).
c = self.add_service(client.Client(basedir=basedirs[0]))
self.clients.append(c)
- c.DEFAULT_MUTABLE_KEYSIZE = 522
+ c.set_default_mutable_keysize(522)
d = c.when_tub_ready()
def _ready(res):
f = open(os.path.join(basedirs[0],"private","helper.furl"), "r")
for i in range(1, self.numclients):
c = self.add_service(client.Client(basedir=basedirs[i]))
self.clients.append(c)
- c.DEFAULT_MUTABLE_KEYSIZE = 522
+ c.set_default_mutable_keysize(522)
log.msg("STARTING")
return self.wait_for_connections()
d.addCallback(_ready)
def _stopped(res):
new_c = client.Client(basedir=self.getdir("client%d" % num))
self.clients[num] = new_c
- new_c.DEFAULT_MUTABLE_KEYSIZE = 522
+ new_c.set_default_mutable_keysize(522)
self.add_service(new_c)
return new_c.when_tub_ready()
d.addCallback(_stopped)
c = client.Client(basedir=basedir)
self.clients.append(c)
- c.DEFAULT_MUTABLE_KEYSIZE = 522
+ c.set_default_mutable_keysize(522)
self.numclients += 1
if add_to_sparent:
c.setServiceParent(self.sparent)
cl0.DEFAULT_ENCODING_PARAMETERS['max_segment_size'] = 12
d2 = cl0.upload(immutable.upload.Data(TEST_DATA, convergence=""))
def _after_upload(u):
- self.uri = IURI(u.uri)
- return cl0.create_node_from_uri(self.uri)
+ filecap = u.uri
+ self.uri = uri.CHKFileURI.init_from_string(filecap)
+ return cl0.create_node_from_uri(filecap)
d2.addCallback(_after_upload)
return d2
d.addCallback(_upload_a_file)
return res
d.addCallback(_return_membrane)
if self.post_call_notifier:
- d.addCallback(self.post_call_notifier, methname)
+ d.addCallback(self.post_call_notifier, self, methname)
return d
def notifyOnDisconnect(self, f, *args, **kwargs):
return None
class NoNetworkClient(Client):
- DEFAULT_MUTABLE_KEYSIZE = 522
-
def create_tub(self):
pass
def init_introducer_client(self):
for i in range(num_servers):
ss = self.make_server(i)
self.add_server(i, ss)
+ self.rebuild_serverlist()
for i in range(num_clients):
clientid = hashutil.tagged_hash("clientid", str(i))[:20]
c = client_config_hooks[i](clientdir)
if not c:
c = NoNetworkClient(clientdir)
+ c.set_default_mutable_keysize(522)
c.nodeid = clientid
c.short_nodeid = b32encode(clientid).lower()[:8]
c._servers = self.all_servers # can be updated later
serverid = ss.my_nodeid
self.servers_by_number[i] = ss
self.servers_by_id[serverid] = wrap_storage_server(ss)
+ self.rebuild_serverlist()
+
+ def rebuild_serverlist(self):
self.all_servers = frozenset(self.servers_by_id.items())
for c in self.clients:
c._servers = self.all_servers
+ def remove_server(self, serverid):
+ # it's enough to remove the server from c._servers (we don't actually
+ # have to detach and stopService it)
+ for i,ss in self.servers_by_number.items():
+ if ss.my_nodeid == serverid:
+ del self.servers_by_number[i]
+ break
+ del self.servers_by_id[serverid]
+ self.rebuild_serverlist()
+
+ def break_server(self, serverid):
+ # mark the given server as broken, so it will throw exceptions when
+ # asked to hold a share
+ self.servers_by_id[serverid].broken = True
+
class GridTestMixin:
def setUp(self):
self.s = service.MultiService()
self.subdir_node = subdir_node
kids = []
for i in range(1, COUNT):
- litnode = LiteralFileURI("%03d-data" % i)
+ litnode = LiteralFileURI("%03d-data" % i).to_string()
kids.append( (u"%03d-small" % i, litnode) )
return subdir_node.set_children(kids)
d.addCallback(_add_children)
import time
-from zope.interface import implements
from twisted.trial import unittest
from twisted.internet import defer
from allmydata import uri, dirnode
from allmydata.client import Client
from allmydata.immutable import upload
-from allmydata.interfaces import IURI, IClient, IMutableFileNode, \
- IDirectoryURI, IReadonlyDirectoryURI, IFileNode, \
+from allmydata.interfaces import IFileNode, \
ExistingChildError, NoSuchChildError, \
IDeepCheckResults, IDeepCheckAndRepairResults, CannotPackUnknownNodeError
from allmydata.mutable.filenode import MutableFileNode
from allmydata.util import hashutil, base32
from allmydata.monitor import Monitor
from allmydata.test.common import make_chk_file_uri, make_mutable_file_uri, \
- FakeDirectoryNode, create_chk_filenode, ErrorMixin
+ ErrorMixin
from allmydata.test.no_network import GridTestMixin
-from allmydata.check_results import CheckResults, CheckAndRepairResults
from allmydata.unknown import UnknownNode
+from allmydata.nodemaker import NodeMaker
from base64 import b32decode
import common_util as testutil
-# to test dirnode.py, we want to construct a tree of real DirectoryNodes that
-# contain pointers to fake files. We start with a fake MutableFileNode that
-# stores all of its data in a static table.
-
-class Marker:
- implements(IFileNode, IMutableFileNode) # sure, why not
- def __init__(self, nodeuri):
- if not isinstance(nodeuri, str):
- nodeuri = nodeuri.to_string()
- self.nodeuri = nodeuri
- si = hashutil.tagged_hash("tag1", nodeuri)[:16]
- self.storage_index = si
- fp = hashutil.tagged_hash("tag2", nodeuri)
- self.verifieruri = uri.SSKVerifierURI(storage_index=si, fingerprint=fp)
- def get_uri(self):
- return self.nodeuri
- def get_readonly_uri(self):
- return self.nodeuri
- def get_verify_cap(self):
- return self.verifieruri
- def get_storage_index(self):
- return self.storage_index
-
- def check(self, monitor, verify=False, add_lease=False):
- r = CheckResults(uri.from_string(self.nodeuri), None)
- r.set_healthy(True)
- r.set_recoverable(True)
- return defer.succeed(r)
-
- def check_and_repair(self, monitor, verify=False, add_lease=False):
- d = self.check(verify)
- def _got(cr):
- r = CheckAndRepairResults(None)
- r.pre_repair_results = r.post_repair_results = cr
- return r
- d.addCallback(_got)
- return d
-
-# dirnode requires three methods from the client: upload(),
-# create_node_from_uri(), and create_empty_dirnode(). Of these, upload() is
-# only used by the convenience composite method add_file().
-
-class FakeClient:
- implements(IClient)
-
- def upload(self, uploadable):
- d = uploadable.get_size()
- d.addCallback(lambda size: uploadable.read(size))
- def _got_data(datav):
- data = "".join(datav)
- n = create_chk_filenode(self, data)
- results = upload.UploadResults()
- results.uri = n.get_uri()
- return results
- d.addCallback(_got_data)
- return d
-
- def create_node_from_uri(self, u, readcap=None):
- if not u:
- u = readcap
- u = IURI(u)
- if (IDirectoryURI.providedBy(u)
- or IReadonlyDirectoryURI.providedBy(u)):
- return FakeDirectoryNode(self).init_from_uri(u)
- return Marker(u.to_string())
-
- def create_empty_dirnode(self):
- n = FakeDirectoryNode(self)
- d = n.create()
- d.addCallback(lambda res: n)
- return d
-
-class Dirnode(unittest.TestCase,
+class Dirnode(GridTestMixin, unittest.TestCase,
testutil.ShouldFailMixin, testutil.StallMixin, ErrorMixin):
timeout = 240 # It takes longer than 120 seconds on Francois's arm box.
- def setUp(self):
- self.client = FakeClient()
- # This is a base32-encoded representation of the directory tree
- # root/file1
- # root/file2
- # root/file3
- # as represented after being fed to _pack_contents.
- # We have it here so we can decode it, feed it to
- # _unpack_contents, and verify that _unpack_contents
- # works correctly.
-
- self.known_tree = "GM4TOORVHJTGS3DFGEWDSNJ2KVJESOSDJBFTU33MPB2GS3LZNVYG6N3GGI3WU5TIORTXC3DOMJ2G4NB2MVWXUZDONBVTE5LNGRZWK2LYN55GY23XGNYXQMTOMZUWU5TENN4DG23ZG5UTO2L2NQ2DO6LFMRWDMZJWGRQTUMZ2GEYDUMJQFQYTIMZ22XZKZORX5XS7CAQCSK3URR6QOHISHRCMGER5LRFSZRNAS5ZSALCS6TWFQAE754IVOIKJVK73WZPP3VUUEDTX3WHTBBZ5YX3CEKHCPG3ZWQLYA4QM6LDRCF7TJQYWLIZHKGN5ROA3AUZPXESBNLQQ6JTC2DBJU2D47IZJTLR3PKZ4RVF57XLPWY7FX7SZV3T6IJ3ORFW37FXUPGOE3ROPFNUX5DCGMAQJ3PGGULBRGM3TU6ZCMN2GS3LFEI5CAMJSGQ3DMNRTHA4TOLRUGI3TKNRWGEWCAITUMFUG6ZJCHIQHWITMNFXGW3LPORUW2ZJCHIQDCMRUGY3DMMZYHE3S4NBSG42TMNRRFQQCE3DJNZVWG4TUNFWWKIR2EAYTENBWGY3DGOBZG4XDIMRXGU3DMML5FQQCE3LUNFWWKIR2EAYTENBWGY3DGOBZG4XDIMRXGU3DMML5FQWDGOJRHI2TUZTJNRSTELBZGQ5FKUSJHJBUQSZ2MFYGKZ3SOBSWQ43IO52WO23CNAZWU3DUGVSWSNTIOE5DK33POVTW4ZLNMNWDK6DHPA2GS2THNF2W25DEN5VGY2LQNFRGG5DKNNRHO5TZPFTWI6LNMRYGQ2LCGJTHM4J2GM5DCMB2GQWDCNBSHKVVQBGRYMACKJ27CVQ6O6B4QPR72RFVTGOZUI76XUSWAX73JRV5PYRHMIFYZIA25MXDPGUGML6M2NMRSG4YD4W4K37ZDYSXHMJ3IUVT4F64YTQQVBJFFFOUC7J7LAB2VFCL5UKKGMR2D3F4EPOYC7UYWQZNR5KXHBSNXLCNBX2SNF22DCXJIHSMEKWEWOG5XCJEVVZ7UW5IB6I64XXQSJ34B5CAYZGZIIMR6LBRGMZTU6ZCMN2GS3LFEI5CAMJSGQ3DMNRTHA4TOLRUGMYDEMJYFQQCE5DBNBXWKIR2EB5SE3DJNZVW233UNFWWKIR2EAYTENBWGY3DGOBZG4XDIMZQGIYTQLBAEJWGS3TLMNZHI2LNMURDUIBRGI2DMNRWGM4DSNZOGQZTAMRRHB6SYIBCNV2GS3LFEI5CAMJSGQ3DMNRTHA4TOLRUGMYDEMJYPUWCYMZZGU5DKOTGNFWGKMZMHE2DUVKSJE5EGSCLHJRW25DDPBYTO2DXPB3GM6DBNYZTI6LJMV3DM2LWNB4TU4LWMNSWW3LKORXWK5DEMN3TI23NNE3WEM3SORRGY5THPA3TKNBUMNZG453BOF2GSZLXMVWWI3DJOFZW623RHIZTUMJQHI2SYMJUGI5BOSHWDPG3WKPAVXCF3XMKA7QVIWPRMWJHDTQHD27AHDCPJWDQENQ5H5ZZILTXQNIXXCIW4LKQABU2GCFRG5FHQN7CHD7HF4EKNRZFIV2ZYQIBM7IQU7F4RGB3XCX3FREPBKQ7UCICHVWPCYFGA6OLH3J45LXQ6GWWICJ3PGWJNLZ7PCRNLAPNYUGU6BENS7OXMBEOOFRIZV3PF2FFWZ5WHDPKXERYP7GNHKRMGEZTOOT3EJRXI2LNMURDUIBRGI2DMNRWGM4DSNZOGQZTGNRSGY4SYIBCORQWQ33FEI5CA6ZCNRUW423NN52GS3LFEI5CAMJSGQ3DMNRTHA4TOLRUGMZTMMRWHEWCAITMNFXGWY3SORUW2ZJCHIQDCMRUGY3DMMZYHE3S4NBTGM3DENRZPUWCAITNORUW2ZJCHIQDCMRUGY3DMMZYHE3S4NBTGM3DENRZPUWCY==="
def test_basic(self):
- d = self.client.create_empty_dirnode()
+ self.basedir = "dirnode/Dirnode/test_basic"
+ self.set_up_grid()
+ c = self.g.clients[0]
+ d = c.create_empty_dirnode()
def _done(res):
- self.failUnless(isinstance(res, FakeDirectoryNode))
+ self.failUnless(isinstance(res, dirnode.DirectoryNode))
rep = str(res)
self.failUnless("RW" in rep)
d.addCallback(_done)
return d
def test_check(self):
- d = self.client.create_empty_dirnode()
+ self.basedir = "dirnode/Dirnode/test_check"
+ self.set_up_grid()
+ c = self.g.clients[0]
+ d = c.create_empty_dirnode()
d.addCallback(lambda dn: dn.check(Monitor()))
def _done(res):
self.failUnless(res.is_healthy())
# root/subdir/file1
# root/subdir/link -> root
# root/rodir
- d = self.client.create_empty_dirnode()
+ c = self.g.clients[0]
+ d = c.create_empty_dirnode()
def _created_root(rootnode):
self._rootnode = rootnode
return rootnode.create_empty_directory(u"subdir")
d.addCallback(_created_root)
def _created_subdir(subdir):
self._subdir = subdir
- d = subdir.add_file(u"file1", upload.Data("data", None))
+ d = subdir.add_file(u"file1", upload.Data("data"*100, None))
d.addCallback(lambda res: subdir.set_node(u"link", self._rootnode))
- d.addCallback(lambda res: self.client.create_empty_dirnode())
+ d.addCallback(lambda res: c.create_empty_dirnode())
d.addCallback(lambda dn:
self._rootnode.set_uri(u"rodir",
dn.get_readonly_uri()))
return d
def test_deepcheck(self):
+ self.basedir = "dirnode/Dirnode/test_deepcheck"
+ self.set_up_grid()
d = self._test_deepcheck_create()
d.addCallback(lambda rootnode: rootnode.start_deep_check().when_done())
def _check_results(r):
return d
def test_deepcheck_and_repair(self):
+ self.basedir = "dirnode/Dirnode/test_deepcheck_and_repair"
+ self.set_up_grid()
d = self._test_deepcheck_create()
d.addCallback(lambda rootnode:
rootnode.start_deep_check_and_repair().when_done())
return d
def _mark_file_bad(self, rootnode):
- si = IURI(rootnode.get_uri())._filenode_uri.storage_index
- rootnode._node.bad_shares[si] = "unhealthy"
+ si = rootnode.get_storage_index()
+ self.delete_shares_numbered(rootnode.get_uri(), [0])
return rootnode
def test_deepcheck_problems(self):
+ self.basedir = "dirnode/Dirnode/test_deepcheck_problems"
+ self.set_up_grid()
d = self._test_deepcheck_create()
d.addCallback(lambda rootnode: self._mark_file_bad(rootnode))
d.addCallback(lambda rootnode: rootnode.start_deep_check().when_done())
return d
def test_readonly(self):
- fileuri = make_chk_file_uri(1234)
- filenode = self.client.create_node_from_uri(fileuri)
+ self.basedir = "dirnode/Dirnode/test_readonly"
+ self.set_up_grid()
+ c = self.g.clients[0]
+ nm = c.nodemaker
+ filecap = make_chk_file_uri(1234)
+ filenode = nm.create_from_cap(filecap)
uploadable = upload.Data("some data", convergence="some convergence string")
- d = self.client.create_empty_dirnode()
+ d = c.create_empty_dirnode()
def _created(rw_dn):
- d2 = rw_dn.set_uri(u"child", fileuri.to_string())
+ d2 = rw_dn.set_uri(u"child", filecap)
d2.addCallback(lambda res: rw_dn)
return d2
d.addCallback(_created)
def _ready(rw_dn):
ro_uri = rw_dn.get_readonly_uri()
- ro_dn = self.client.create_node_from_uri(ro_uri)
+ ro_dn = c.create_node_from_uri(ro_uri)
self.failUnless(ro_dn.is_readonly())
self.failUnless(ro_dn.is_mutable())
self.shouldFail(dirnode.NotMutableError, "set_uri ro", None,
- ro_dn.set_uri, u"newchild", fileuri.to_string())
+ ro_dn.set_uri, u"newchild", filecap)
self.shouldFail(dirnode.NotMutableError, "set_uri ro", None,
ro_dn.set_node, u"newchild", filenode)
self.shouldFail(dirnode.NotMutableError, "set_nodes ro", None,
self.failUnless(a >= b, "%r should be >= %r" % (a, b))
def test_create(self):
+ self.basedir = "dirnode/Dirnode/test_create"
+ self.set_up_grid()
+ c = self.g.clients[0]
+
self.expected_manifest = []
self.expected_verifycaps = set()
self.expected_storage_indexes = set()
- d = self.client.create_empty_dirnode()
+ d = c.create_empty_dirnode()
def _then(n):
# /
+ self.rootnode = n
self.failUnless(n.is_mutable())
u = n.get_uri()
self.failUnless(u)
d.addCallback(lambda res: self.failUnlessEqual(res, {}))
d.addCallback(lambda res: n.has_child(u"missing"))
d.addCallback(lambda res: self.failIf(res))
+
fake_file_uri = make_mutable_file_uri()
other_file_uri = make_mutable_file_uri()
- m = Marker(fake_file_uri)
+ m = c.nodemaker.create_from_cap(fake_file_uri)
ffu_v = m.get_verify_cap().to_string()
self.expected_manifest.append( ((u"child",) , m.get_uri()) )
self.expected_verifycaps.add(ffu_v)
self.expected_storage_indexes.add(base32.b2a(m.get_storage_index()))
- d.addCallback(lambda res: n.set_uri(u"child", fake_file_uri.to_string()))
+ d.addCallback(lambda res: n.set_uri(u"child", fake_file_uri))
d.addCallback(lambda res:
self.shouldFail(ExistingChildError, "set_uri-no",
"child 'child' already exists",
- n.set_uri, u"child", other_file_uri.to_string(),
+ n.set_uri, u"child", other_file_uri,
overwrite=False))
# /
# /child = mutable
# /child = mutable
# /subdir = directory
def _created(subdir):
- self.failUnless(isinstance(subdir, FakeDirectoryNode))
+ self.failUnless(isinstance(subdir, dirnode.DirectoryNode))
self.subdir = subdir
new_v = subdir.get_verify_cap().to_string()
assert isinstance(new_v, str)
d.addCallback(lambda res: n.get_child_at_path(u"subdir/subsubdir"))
d.addCallback(lambda subsubdir:
self.failUnless(isinstance(subsubdir,
- FakeDirectoryNode)))
+ dirnode.DirectoryNode)))
d.addCallback(lambda res: n.get_child_at_path(u""))
d.addCallback(lambda res: self.failUnlessEqual(res.get_uri(),
n.get_uri()))
n.get_child_and_metadata_at_path(u""))
def _check_child_and_metadata1(res):
child, metadata = res
- self.failUnless(isinstance(child, FakeDirectoryNode))
+ self.failUnless(isinstance(child, dirnode.DirectoryNode))
# edge-metadata needs at least one path segment
self.failUnlessEqual(sorted(metadata.keys()), [])
d.addCallback(_check_child_and_metadata1)
def _check_child_and_metadata2(res):
child, metadata = res
self.failUnlessEqual(child.get_uri(),
- fake_file_uri.to_string())
+ fake_file_uri)
self.failUnlessEqual(set(metadata.keys()),
set(["tahoe", "ctime", "mtime"]))
d.addCallback(_check_child_and_metadata2)
n.get_child_and_metadata_at_path(u"subdir/subsubdir"))
def _check_child_and_metadata3(res):
child, metadata = res
- self.failUnless(isinstance(child, FakeDirectoryNode))
+ self.failUnless(isinstance(child, dirnode.DirectoryNode))
self.failUnlessEqual(set(metadata.keys()),
set(["tahoe", "ctime", "mtime"]))
d.addCallback(_check_child_and_metadata3)
# set_uri + metadata
# it should be possible to add a child without any metadata
- d.addCallback(lambda res: n.set_uri(u"c2", fake_file_uri.to_string(), {}))
+ d.addCallback(lambda res: n.set_uri(u"c2", fake_file_uri, {}))
d.addCallback(lambda res: n.get_metadata_for(u"c2"))
d.addCallback(lambda metadata: self.failUnlessEqual(metadata.keys(), ['tahoe']))
# You can't override the link timestamps.
- d.addCallback(lambda res: n.set_uri(u"c2", fake_file_uri.to_string(), { 'tahoe': {'linkcrtime': "bogus"}}))
+ d.addCallback(lambda res: n.set_uri(u"c2", fake_file_uri, { 'tahoe': {'linkcrtime': "bogus"}}))
d.addCallback(lambda res: n.get_metadata_for(u"c2"))
def _has_good_linkcrtime(metadata):
self.failUnless(metadata.has_key('tahoe'))
d.addCallback(_has_good_linkcrtime)
# if we don't set any defaults, the child should get timestamps
- d.addCallback(lambda res: n.set_uri(u"c3", fake_file_uri.to_string()))
+ d.addCallback(lambda res: n.set_uri(u"c3", fake_file_uri))
d.addCallback(lambda res: n.get_metadata_for(u"c3"))
d.addCallback(lambda metadata:
self.failUnlessEqual(set(metadata.keys()),
# or we can add specific metadata at set_uri() time, which
# overrides the timestamps
- d.addCallback(lambda res: n.set_uri(u"c4", fake_file_uri.to_string(),
+ d.addCallback(lambda res: n.set_uri(u"c4", fake_file_uri,
{"key": "value"}))
d.addCallback(lambda res: n.get_metadata_for(u"c4"))
d.addCallback(lambda metadata:
# set_node + metadata
# it should be possible to add a child without any metadata
d.addCallback(lambda res: n.set_node(u"d2", n, {}))
- d.addCallback(lambda res: self.client.create_empty_dirnode())
+ d.addCallback(lambda res: c.create_empty_dirnode())
d.addCallback(lambda n2:
self.shouldFail(ExistingChildError, "set_node-no",
"child 'd2' already exists",
d.addCallback(lambda res: n.delete(u"d4"))
# metadata through set_children()
- d.addCallback(lambda res: n.set_children([ (u"e1", fake_file_uri.to_string()),
- (u"e2", fake_file_uri.to_string(), {}),
- (u"e3", fake_file_uri.to_string(),
+ d.addCallback(lambda res: n.set_children([ (u"e1", fake_file_uri),
+ (u"e2", fake_file_uri, {}),
+ (u"e3", fake_file_uri,
{"key": "value"}),
]))
d.addCallback(lambda res:
self.failUnlessEqual(sorted(children.keys()),
sorted([u"child"])))
- uploadable = upload.Data("some data", convergence="some convergence string")
- d.addCallback(lambda res: n.add_file(u"newfile", uploadable))
+ uploadable1 = upload.Data("some data", convergence="converge")
+ d.addCallback(lambda res: n.add_file(u"newfile", uploadable1))
d.addCallback(lambda newnode:
self.failUnless(IFileNode.providedBy(newnode)))
- other_uploadable = upload.Data("some data", convergence="stuff")
+ uploadable2 = upload.Data("some data", convergence="stuff")
d.addCallback(lambda res:
self.shouldFail(ExistingChildError, "add_file-no",
"child 'newfile' already exists",
n.add_file, u"newfile",
- other_uploadable,
+ uploadable2,
overwrite=False))
d.addCallback(lambda res: n.list())
d.addCallback(lambda children:
self.failUnlessEqual(set(metadata.keys()),
set(["tahoe", "ctime", "mtime"])))
+ uploadable3 = upload.Data("some data", convergence="converge")
d.addCallback(lambda res: n.add_file(u"newfile-metadata",
- uploadable,
+ uploadable3,
{"key": "value"}))
d.addCallback(lambda newnode:
self.failUnless(IFileNode.providedBy(newnode)))
d.addCallback(lambda res: self.subdir2.get(u"child"))
d.addCallback(lambda child:
self.failUnlessEqual(child.get_uri(),
- fake_file_uri.to_string()))
+ fake_file_uri))
# move it back, using new_child_name=
d.addCallback(lambda res:
# now make sure that we honor overwrite=False
d.addCallback(lambda res:
- self.subdir2.set_uri(u"newchild", other_file_uri.to_string()))
+ self.subdir2.set_uri(u"newchild", other_file_uri))
d.addCallback(lambda res:
self.shouldFail(ExistingChildError, "move_child_to-no",
d.addCallback(lambda res: self.subdir2.get(u"newchild"))
d.addCallback(lambda child:
self.failUnlessEqual(child.get_uri(),
- other_file_uri.to_string()))
+ other_file_uri))
return d
d.addErrback(self.explain_error)
return d
+class Packing(unittest.TestCase):
+ # This is a base32-encoded representation of the directory tree
+ # root/file1
+ # root/file2
+ # root/file3
+ # as represented after being fed to _pack_contents.
+ # We have it here so we can decode it, feed it to
+ # _unpack_contents, and verify that _unpack_contents
+ # works correctly.
+
+ known_tree = "GM4TOORVHJTGS3DFGEWDSNJ2KVJESOSDJBFTU33MPB2GS3LZNVYG6N3GGI3WU5TIORTXC3DOMJ2G4NB2MVWXUZDONBVTE5LNGRZWK2LYN55GY23XGNYXQMTOMZUWU5TENN4DG23ZG5UTO2L2NQ2DO6LFMRWDMZJWGRQTUMZ2GEYDUMJQFQYTIMZ22XZKZORX5XS7CAQCSK3URR6QOHISHRCMGER5LRFSZRNAS5ZSALCS6TWFQAE754IVOIKJVK73WZPP3VUUEDTX3WHTBBZ5YX3CEKHCPG3ZWQLYA4QM6LDRCF7TJQYWLIZHKGN5ROA3AUZPXESBNLQQ6JTC2DBJU2D47IZJTLR3PKZ4RVF57XLPWY7FX7SZV3T6IJ3ORFW37FXUPGOE3ROPFNUX5DCGMAQJ3PGGULBRGM3TU6ZCMN2GS3LFEI5CAMJSGQ3DMNRTHA4TOLRUGI3TKNRWGEWCAITUMFUG6ZJCHIQHWITMNFXGW3LPORUW2ZJCHIQDCMRUGY3DMMZYHE3S4NBSG42TMNRRFQQCE3DJNZVWG4TUNFWWKIR2EAYTENBWGY3DGOBZG4XDIMRXGU3DMML5FQQCE3LUNFWWKIR2EAYTENBWGY3DGOBZG4XDIMRXGU3DMML5FQWDGOJRHI2TUZTJNRSTELBZGQ5FKUSJHJBUQSZ2MFYGKZ3SOBSWQ43IO52WO23CNAZWU3DUGVSWSNTIOE5DK33POVTW4ZLNMNWDK6DHPA2GS2THNF2W25DEN5VGY2LQNFRGG5DKNNRHO5TZPFTWI6LNMRYGQ2LCGJTHM4J2GM5DCMB2GQWDCNBSHKVVQBGRYMACKJ27CVQ6O6B4QPR72RFVTGOZUI76XUSWAX73JRV5PYRHMIFYZIA25MXDPGUGML6M2NMRSG4YD4W4K37ZDYSXHMJ3IUVT4F64YTQQVBJFFFOUC7J7LAB2VFCL5UKKGMR2D3F4EPOYC7UYWQZNR5KXHBSNXLCNBX2SNF22DCXJIHSMEKWEWOG5XCJEVVZ7UW5IB6I64XXQSJ34B5CAYZGZIIMR6LBRGMZTU6ZCMN2GS3LFEI5CAMJSGQ3DMNRTHA4TOLRUGMYDEMJYFQQCE5DBNBXWKIR2EB5SE3DJNZVW233UNFWWKIR2EAYTENBWGY3DGOBZG4XDIMZQGIYTQLBAEJWGS3TLMNZHI2LNMURDUIBRGI2DMNRWGM4DSNZOGQZTAMRRHB6SYIBCNV2GS3LFEI5CAMJSGQ3DMNRTHA4TOLRUGMYDEMJYPUWCYMZZGU5DKOTGNFWGKMZMHE2DUVKSJE5EGSCLHJRW25DDPBYTO2DXPB3GM6DBNYZTI6LJMV3DM2LWNB4TU4LWMNSWW3LKORXWK5DEMN3TI23NNE3WEM3SORRGY5THPA3TKNBUMNZG453BOF2GSZLXMVWWI3DJOFZW623RHIZTUMJQHI2SYMJUGI5BOSHWDPG3WKPAVXCF3XMKA7QVIWPRMWJHDTQHD27AHDCPJWDQENQ5H5ZZILTXQNIXXCIW4LKQABU2GCFRG5FHQN7CHD7HF4EKNRZFIV2ZYQIBM7IQU7F4RGB3XCX3FREPBKQ7UCICHVWPCYFGA6OLH3J45LXQ6GWWICJ3PGWJNLZ7PCRNLAPNYUGU6BENS7OXMBEOOFRIZV3PF2FFWZ5WHDPKXERYP7GNHKRMGEZTOOT3EJRXI2LNMURDUIBRGI2DMNRWGM4DSNZOGQZTGNRSGY4SYIBCORQWQ33FEI5CA6ZCNRUW423NN52GS3LFEI5CAMJSGQ3DMNRTHA4TOLRUGMZTMMRWHEWCAITMNFXGWY3SORUW2ZJCHIQDCMRUGY3DMMZYHE3S4NBTGM3DENRZPUWCAITNORUW2ZJCHIQDCMRUGY3DMMZYHE3S4NBTGM3DENRZPUWCY==="
+
def test_unpack_and_pack_behavior(self):
known_tree = b32decode(self.known_tree)
- d = self.client.create_empty_dirnode()
-
- def _check_tree(node):
- def check_children(children):
- # Are all the expected child nodes there?
- self.failUnless(children.has_key(u'file1'))
- self.failUnless(children.has_key(u'file2'))
- self.failUnless(children.has_key(u'file3'))
-
- # Are the metadata for child 3 right?
- file3_rocap = "URI:CHK:cmtcxq7hwxvfxan34yiev6ivhy:qvcekmjtoetdcw4kmi7b3rtblvgx7544crnwaqtiewemdliqsokq:3:10:5"
- file3_rwcap = "URI:CHK:cmtcxq7hwxvfxan34yiev6ivhy:qvcekmjtoetdcw4kmi7b3rtblvgx7544crnwaqtiewemdliqsokq:3:10:5"
- file3_metadata = {'ctime': 1246663897.4336269, 'tahoe': {'linkmotime': 1246663897.4336269, 'linkcrtime': 1246663897.4336269}, 'mtime': 1246663897.4336269}
- self.failUnlessEqual(file3_metadata, children[u'file3'][1])
- self.failUnlessEqual(file3_rocap,
- children[u'file3'][0].get_readonly_uri())
- self.failUnlessEqual(file3_rwcap,
- children[u'file3'][0].get_uri())
-
- # Are the metadata for child 2 right?
- file2_rocap = "URI:CHK:apegrpehshwugkbh3jlt5ei6hq:5oougnemcl5xgx4ijgiumtdojlipibctjkbwvyygdymdphib2fvq:3:10:4"
- file2_rwcap = "URI:CHK:apegrpehshwugkbh3jlt5ei6hq:5oougnemcl5xgx4ijgiumtdojlipibctjkbwvyygdymdphib2fvq:3:10:4"
- file2_metadata = {'ctime': 1246663897.430218, 'tahoe': {'linkmotime': 1246663897.430218, 'linkcrtime': 1246663897.430218}, 'mtime': 1246663897.430218}
- self.failUnlessEqual(file2_metadata, children[u'file2'][1])
- self.failUnlessEqual(file2_rocap,
- children[u'file2'][0].get_readonly_uri())
- self.failUnlessEqual(file2_rwcap,
- children[u'file2'][0].get_uri())
-
- # Are the metadata for child 1 right?
- file1_rocap = "URI:CHK:olxtimympo7f27jvhtgqlnbtn4:emzdnhk2um4seixozlkw3qx2nfijvdkx3ky7i7izl47yedl6e64a:3:10:10"
- file1_rwcap = "URI:CHK:olxtimympo7f27jvhtgqlnbtn4:emzdnhk2um4seixozlkw3qx2nfijvdkx3ky7i7izl47yedl6e64a:3:10:10"
- file1_metadata = {'ctime': 1246663897.4275661, 'tahoe': {'linkmotime': 1246663897.4275661, 'linkcrtime': 1246663897.4275661}, 'mtime': 1246663897.4275661}
- self.failUnlessEqual(file1_metadata, children[u'file1'][1])
- self.failUnlessEqual(file1_rocap,
- children[u'file1'][0].get_readonly_uri())
- self.failUnlessEqual(file1_rwcap,
- children[u'file1'][0].get_uri())
-
- children = node._unpack_contents(known_tree)
-
- check_children(children)
-
- packed_children = node._pack_contents(children)
-
- children = node._unpack_contents(packed_children)
-
- check_children(children)
-
- d.addCallback(_check_tree)
- return d
+ nodemaker = NodeMaker(None, None, None,
+ None, None, None,
+ {"k": 3, "n": 10}, None)
+ writecap = "URI:SSK-RO:e3mdrzfwhoq42hy5ubcz6rp3o4:ybyibhnp3vvwuq2vaw2ckjmesgkklfs6ghxleztqidihjyofgw7q"
+ filenode = nodemaker.create_from_cap(writecap)
+ node = dirnode.DirectoryNode(filenode, nodemaker, None)
+ children = node._unpack_contents(known_tree)
+ self._check_children(children)
+
+ packed_children = node._pack_contents(children)
+ children = node._unpack_contents(packed_children)
+ self._check_children(children)
+
+ def _check_children(self, children):
+ # Are all the expected child nodes there?
+ self.failUnless(children.has_key(u'file1'))
+ self.failUnless(children.has_key(u'file2'))
+ self.failUnless(children.has_key(u'file3'))
+
+ # Are the metadata for child 3 right?
+ file3_rocap = "URI:CHK:cmtcxq7hwxvfxan34yiev6ivhy:qvcekmjtoetdcw4kmi7b3rtblvgx7544crnwaqtiewemdliqsokq:3:10:5"
+ file3_rwcap = "URI:CHK:cmtcxq7hwxvfxan34yiev6ivhy:qvcekmjtoetdcw4kmi7b3rtblvgx7544crnwaqtiewemdliqsokq:3:10:5"
+ file3_metadata = {'ctime': 1246663897.4336269, 'tahoe': {'linkmotime': 1246663897.4336269, 'linkcrtime': 1246663897.4336269}, 'mtime': 1246663897.4336269}
+ self.failUnlessEqual(file3_metadata, children[u'file3'][1])
+ self.failUnlessEqual(file3_rocap,
+ children[u'file3'][0].get_readonly_uri())
+ self.failUnlessEqual(file3_rwcap,
+ children[u'file3'][0].get_uri())
+
+ # Are the metadata for child 2 right?
+ file2_rocap = "URI:CHK:apegrpehshwugkbh3jlt5ei6hq:5oougnemcl5xgx4ijgiumtdojlipibctjkbwvyygdymdphib2fvq:3:10:4"
+ file2_rwcap = "URI:CHK:apegrpehshwugkbh3jlt5ei6hq:5oougnemcl5xgx4ijgiumtdojlipibctjkbwvyygdymdphib2fvq:3:10:4"
+ file2_metadata = {'ctime': 1246663897.430218, 'tahoe': {'linkmotime': 1246663897.430218, 'linkcrtime': 1246663897.430218}, 'mtime': 1246663897.430218}
+ self.failUnlessEqual(file2_metadata, children[u'file2'][1])
+ self.failUnlessEqual(file2_rocap,
+ children[u'file2'][0].get_readonly_uri())
+ self.failUnlessEqual(file2_rwcap,
+ children[u'file2'][0].get_uri())
+
+ # Are the metadata for child 1 right?
+ file1_rocap = "URI:CHK:olxtimympo7f27jvhtgqlnbtn4:emzdnhk2um4seixozlkw3qx2nfijvdkx3ky7i7izl47yedl6e64a:3:10:10"
+ file1_rwcap = "URI:CHK:olxtimympo7f27jvhtgqlnbtn4:emzdnhk2um4seixozlkw3qx2nfijvdkx3ky7i7izl47yedl6e64a:3:10:10"
+ file1_metadata = {'ctime': 1246663897.4275661, 'tahoe': {'linkmotime': 1246663897.4275661, 'linkcrtime': 1246663897.4275661}, 'mtime': 1246663897.4275661}
+ self.failUnlessEqual(file1_metadata, children[u'file1'][1])
+ self.failUnlessEqual(file1_rocap,
+ children[u'file1'][0].get_readonly_uri())
+ self.failUnlessEqual(file1_rwcap,
+ children[u'file1'][0].get_uri())
def test_caching_dict(self):
d = dirnode.CachingDict()
self.data = modifier(self.data, None, True)
return defer.succeed(None)
+class FakeNodeMaker(NodeMaker):
+ def create_mutable_file(self, contents="", keysize=None):
+ return defer.succeed(FakeMutableFile(contents))
+
class FakeClient2(Client):
def __init__(self):
- pass
- def create_mutable_file(self, initial_contents=""):
- return defer.succeed(FakeMutableFile(initial_contents))
+ self.nodemaker = FakeNodeMaker(None, None, None,
+ None, None, None,
+ {"k":3,"n":10}, None)
+ def create_node_from_uri(self, rwcap, rocap):
+ return self.nodemaker.create_from_cap(rwcap, rocap)
class Dirnode2(unittest.TestCase, testutil.ShouldFailMixin):
def setUp(self):
self.client = FakeClient2()
+ self.nodemaker = self.client.nodemaker
def test_from_future(self):
# create a dirnode that contains unknown URI types, and make sure we
# tolerate them properly. Since dirnodes aren't allowed to add
# unknown node types, we have to be tricky.
- d = self.client.create_empty_dirnode()
+ d = self.nodemaker.create_new_mutable_directory()
future_writecap = "x-tahoe-crazy://I_am_from_the_future."
future_readcap = "x-tahoe-crazy-readonly://I_am_from_the_future."
future_node = UnknownNode(future_writecap, future_readcap)
return res
d.addCallback(_ucwe)
return d
-class UCWEingDirectoryNode(dirnode.DirectoryNode):
- filenode_class = UCWEingMutableFileNode
+
+class UCWEingNodeMaker(NodeMaker):
+ def _create_mutable(self, cap):
+ n = UCWEingMutableFileNode(self.storage_broker, self.secret_holder,
+ self.default_encoding_parameters,
+ self.history)
+ return n.init_from_uri(cap)
class Deleter(GridTestMixin, unittest.TestCase):
return dn.add_file(u"file", small)
d.addCallback(_created_dir)
def _do_delete(ignored):
- n = UCWEingDirectoryNode(c0).init_from_uri(self.root_uri)
+ nm = UCWEingNodeMaker(c0.storage_broker, c0._secret_holder,
+ c0.get_history(), c0.getServiceNamed("uploader"),
+ c0.getServiceNamed("downloader"),
+ c0.download_cache_dirman,
+ c0.get_encoding_parameters(),
+ c0._key_generator)
+ n = nm.create_from_cap(self.root_uri)
assert n._node.please_ucwe_after_next_upload == False
n._node.please_ucwe_after_next_upload = True
# This should succeed, not raise an exception
return d
-class Adder(unittest.TestCase,
- testutil.ShouldFailMixin, testutil.StallMixin, ErrorMixin):
-
- def setUp(self):
- self.client = FakeClient()
+class Adder(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
def test_overwrite(self):
+ # note: This functionality could be tested without actually creating
+ # several RSA keys. It would be faster without the GridTestMixin: use
+ # dn.set_node(nodemaker.create_from_cap(make_chk_file_uri())) instead
+ # of dn.add_file, and use a special NodeMaker that creates fake
+ # mutable files.
+ self.basedir = "dirnode/Adder/test_overwrite"
+ self.set_up_grid()
+ c = self.g.clients[0]
fileuri = make_chk_file_uri(1234)
- filenode = self.client.create_node_from_uri(fileuri)
- d = self.client.create_empty_dirnode()
+ filenode = c.nodemaker.create_from_cap(fileuri)
+ d = c.create_empty_dirnode()
def _create_directory_tree(root_node):
# Build
d.addCallback(lambda res:
root_node.set_node(u'dir2', filenode))
# We try overwriting a file with a child while also specifying
- # overwrite=False. We should receive an ExistingChildError
+ # overwrite=False. We should receive an ExistingChildError
# when we do this.
d.addCallback(lambda res:
self.shouldFail(ExistingChildError, "set_node",
from twisted.trial import unittest
-from allmydata import uri
+from allmydata import uri, client
from allmydata.monitor import Monitor
from allmydata.immutable import filenode, download
from allmydata.mutable.filenode import MutableFileNode
return None
def get_encoding_parameters(self):
return {"k": 3, "n": 10}
+ def get_storage_broker(self):
+ return None
+ def get_history(self):
+ return None
+ _secret_holder = client.SecretHolder("lease secret")
class Node(unittest.TestCase):
def test_chk_filenode(self):
size=1000)
c = FakeClient()
cf = cachedir.CacheFile("none")
- fn1 = filenode.FileNode(u, c, cf)
- fn2 = filenode.FileNode(u, c, cf)
+ fn1 = filenode.FileNode(u.to_string(), None, None, None, None, cf)
+ fn2 = filenode.FileNode(u.to_string(), None, None, None, None, cf)
self.failUnlessEqual(fn1, fn2)
self.failIfEqual(fn1, "I am not a filenode")
self.failIfEqual(fn1, NotANode())
DATA = "I am a short file."
u = uri.LiteralFileURI(data=DATA)
c = None
- fn1 = filenode.LiteralFileNode(u, c)
- fn2 = filenode.LiteralFileNode(u, c)
+ fn1 = filenode.LiteralFileNode(u.to_string())
+ fn2 = filenode.LiteralFileNode(u.to_string())
self.failUnlessEqual(fn1, fn2)
self.failIfEqual(fn1, "I am not a filenode")
self.failIfEqual(fn1, NotANode())
si = hashutil.ssk_storage_index_hash(rk)
u = uri.WriteableSSKFileURI("\x00"*16, "\x00"*32)
- n = MutableFileNode(client).init_from_uri(u)
+ n = MutableFileNode(None, None, client.get_encoding_parameters(),
+ None).init_from_uri(u.to_string())
self.failUnlessEqual(n.get_writekey(), wk)
self.failUnlessEqual(n.get_readkey(), rk)
self.failUnlessEqual(n.is_mutable(), True)
self.failUnlessEqual(n.is_readonly(), False)
- n2 = MutableFileNode(client).init_from_uri(u)
+ n2 = MutableFileNode(None, None, client.get_encoding_parameters(),
+ None).init_from_uri(u.to_string())
self.failUnlessEqual(n, n2)
self.failIfEqual(n, "not even the right type")
self.failIfEqual(n, u) # not the right class
def test_literal_filenode(self):
DATA = "I am a short file."
u = uri.LiteralFileURI(data=DATA)
- fn1 = filenode.LiteralFileNode(u, None)
+ fn1 = filenode.LiteralFileNode(u.to_string())
d = fn1.check(Monitor())
def _check_checker_results(cr):
from allmydata.storage.server import si_b2a
from allmydata.storage_client import StorageFarmBroker
from allmydata.immutable import offloaded, upload
-from allmydata import uri
+from allmydata import uri, client
from allmydata.util import hashutil, fileutil, mathutil
from pycryptopp.cipher.aes import AES
}
stats_provider = None
storage_broker = StorageFarmBroker(None, True)
+ _secret_holder = client.SecretHolder("lease secret")
def log(self, *args, **kwargs):
return log.msg(*args, **kwargs)
def get_encoding_parameters(self):
-import os, struct
+import struct
from cStringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
-from twisted.python import failure
-from allmydata import uri
-from allmydata.storage.server import StorageServer
+from allmydata import uri, client
+from allmydata.nodemaker import NodeMaker
from allmydata.immutable import download
-from allmydata.util import base32, idlib
+from allmydata.util import base32
from allmydata.util.idlib import shortnodeid_b2a
-from allmydata.util.hashutil import tagged_hash
-from allmydata.util.fileutil import make_dirs
-from allmydata.interfaces import IURI, IMutableFileURI, IUploadable, \
- NotEnoughSharesError, IRepairResults, ICheckAndRepairResults
+from allmydata.util.hashutil import tagged_hash, ssk_writekey_hash, \
+ ssk_pubkey_fingerprint_hash
+from allmydata.interfaces import IRepairResults, ICheckAndRepairResults, \
+ NotEnoughSharesError
from allmydata.monitor import Monitor
from allmydata.test.common import ShouldFailMixin
+from allmydata.test.no_network import GridTestMixin
from foolscap.api import eventually, fireEventually
from foolscap.logging import log
from allmydata.storage_client import StorageFarmBroker
import common_util as testutil
-# this "FastMutableFileNode" exists solely to speed up tests by using smaller
-# public/private keys. Once we switch to fast DSA-based keys, we can get rid
-# of this.
-
-class FastMutableFileNode(MutableFileNode):
- SIGNATURE_KEY_SIZE = 522
-
# this "FakeStorage" exists to put the share data in RAM and avoid using real
# network connections, both to speed up the tests and to reduce the amount of
# non-mutable.py code being exercised.
self._sequence = None
self._pending = {}
self._pending_timer = None
- self._special_answers = {}
def read(self, peerid, storage_index):
shares = self._peers.get(peerid, {})
- if self._special_answers.get(peerid, []):
- mode = self._special_answers[peerid].pop(0)
- if mode == "fail":
- shares = failure.Failure(IntentionalError())
- elif mode == "none":
- shares = {}
- elif mode == "normal":
- pass
if self._sequence is None:
return defer.succeed(shares)
d = defer.Deferred()
return fireEventually(answer)
-# our "FakeClient" has just enough functionality of the real Client to let
-# the tests run.
-
-class FakeClient:
- mutable_file_node_class = FastMutableFileNode
-
- def __init__(self, num_peers=10):
- self._storage = FakeStorage()
- self._num_peers = num_peers
- peerids = [tagged_hash("peerid", "%d" % i)[:20]
- for i in range(self._num_peers)]
- self.nodeid = "fakenodeid"
- self.storage_broker = StorageFarmBroker(None, True)
- for peerid in peerids:
- fss = FakeStorageServer(peerid, self._storage)
- self.storage_broker.test_add_server(peerid, fss)
-
- def get_storage_broker(self):
- return self.storage_broker
- def debug_break_connection(self, peerid):
- self.storage_broker.test_servers[peerid].broken = True
- def debug_remove_connection(self, peerid):
- self.storage_broker.test_servers.pop(peerid)
- def debug_get_connection(self, peerid):
- return self.storage_broker.test_servers[peerid]
-
- def get_encoding_parameters(self):
- return {"k": 3, "n": 10}
-
- def log(self, msg, **kw):
- return log.msg(msg, **kw)
-
- def get_renewal_secret(self):
- return "I hereby permit you to renew my files"
- def get_cancel_secret(self):
- return "I hereby permit you to cancel my leases"
-
- def create_mutable_file(self, contents=""):
- n = self.mutable_file_node_class(self)
- d = n.create(contents)
- d.addCallback(lambda res: n)
- return d
-
- def get_history(self):
- return None
-
- def create_node_from_uri(self, u, readcap=None):
- if not u:
- u = readcap
- u = IURI(u)
- assert IMutableFileURI.providedBy(u), u
- res = self.mutable_file_node_class(self).init_from_uri(u)
- return res
-
- def upload(self, uploadable):
- assert IUploadable.providedBy(uploadable)
- d = uploadable.get_size()
- d.addCallback(lambda length: uploadable.read(length))
- #d.addCallback(self.create_mutable_file)
- def _got_data(datav):
- data = "".join(datav)
- #newnode = FastMutableFileNode(self)
- return uri.LiteralFileURI(data)
- d.addCallback(_got_data)
- return d
-
-
def flip_bit(original, byte_offset):
return (original[:byte_offset] +
chr(ord(original[byte_offset]) ^ 0x01) +
shares[shnum] = flip_bit(data, real_offset)
return res
+def make_storagebroker(s=None, num_peers=10):
+ if not s:
+ s = FakeStorage()
+ peerids = [tagged_hash("peerid", "%d" % i)[:20]
+ for i in range(num_peers)]
+ storage_broker = StorageFarmBroker(None, True)
+ for peerid in peerids:
+ fss = FakeStorageServer(peerid, s)
+ storage_broker.test_add_server(peerid, fss)
+ return storage_broker
+
+def make_nodemaker(s=None, num_peers=10):
+ storage_broker = make_storagebroker(s, num_peers)
+ sh = client.SecretHolder("lease secret")
+ keygen = client.KeyGenerator()
+ keygen.set_default_keysize(522)
+ nodemaker = NodeMaker(storage_broker, sh, None,
+ None, None, None,
+ {"k": 3, "n": 10}, keygen)
+ return nodemaker
+
class Filenode(unittest.TestCase, testutil.ShouldFailMixin):
# this used to be in Publish, but we removed the limit. Some of
# these tests test whether the new code correctly allows files
# larger than the limit.
OLD_MAX_SEGMENT_SIZE = 3500000
def setUp(self):
- self.client = FakeClient()
+ self._storage = s = FakeStorage()
+ self.nodemaker = make_nodemaker(s)
def test_create(self):
- d = self.client.create_mutable_file()
+ d = self.nodemaker.create_mutable_file()
def _created(n):
- self.failUnless(isinstance(n, FastMutableFileNode))
+ self.failUnless(isinstance(n, MutableFileNode))
self.failUnlessEqual(n.get_storage_index(), n._storage_index)
- sb = self.client.get_storage_broker()
+ sb = self.nodemaker.storage_broker
peer0 = sorted(sb.get_all_serverids())[0]
- shnums = self.client._storage._peers[peer0].keys()
+ shnums = self._storage._peers[peer0].keys()
self.failUnlessEqual(len(shnums), 1)
d.addCallback(_created)
return d
def test_serialize(self):
- n = MutableFileNode(self.client)
+ n = MutableFileNode(None, None, {"k": 3, "n": 10}, None)
calls = []
def _callback(*args, **kwargs):
self.failUnlessEqual(args, (4,) )
return d
def test_upload_and_download(self):
- d = self.client.create_mutable_file()
+ d = self.nodemaker.create_mutable_file()
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_READ))
return d
def test_create_with_initial_contents(self):
- d = self.client.create_mutable_file("contents 1")
+ d = self.nodemaker.create_mutable_file("contents 1")
def _created(n):
d = n.download_best_version()
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
def test_create_with_too_large_contents(self):
BIG = "a" * (self.OLD_MAX_SEGMENT_SIZE + 1)
- d = self.client.create_mutable_file(BIG)
+ d = self.nodemaker.create_mutable_file(BIG)
def _created(n):
d = n.overwrite(BIG)
return d
raise UncoordinatedWriteError("simulated")
return old_contents
- d = self.client.create_mutable_file("line1")
+ d = self.nodemaker.create_mutable_file("line1")
def _created(n):
d = n.modify(_modifier)
d.addCallback(lambda res: n.download_best_version())
giveuper._delay = 0.1
giveuper.factor = 1
- d = self.client.create_mutable_file("line1")
+ d = self.nodemaker.create_mutable_file("line1")
def _created(n):
d = n.modify(_modifier)
d.addCallback(lambda res: n.download_best_version())
return d
def test_upload_and_download_full_size_keys(self):
- self.client.mutable_file_node_class = MutableFileNode
- d = self.client.create_mutable_file()
+ self.nodemaker.key_generator = client.KeyGenerator()
+ d = self.nodemaker.create_mutable_file()
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_READ))
class MakeShares(unittest.TestCase):
def test_encrypt(self):
- c = FakeClient()
- fn = FastMutableFileNode(c)
+ nm = make_nodemaker()
CONTENTS = "some initial contents"
- d = fn.create(CONTENTS)
- def _created(res):
- p = Publish(fn, None)
+ d = nm.create_mutable_file(CONTENTS)
+ def _created(fn):
+ p = Publish(fn, nm.storage_broker, None)
p.salt = "SALT" * 4
p.readkey = "\x00" * 16
p.newdata = CONTENTS
return d
def test_generate(self):
- c = FakeClient()
- fn = FastMutableFileNode(c)
+ nm = make_nodemaker()
CONTENTS = "some initial contents"
- d = fn.create(CONTENTS)
- def _created(res):
- p = Publish(fn, None)
+ d = nm.create_mutable_file(CONTENTS)
+ def _created(fn):
+ self._fn = fn
+ p = Publish(fn, nm.storage_broker, None)
self._p = p
p.newdata = CONTENTS
p.required_shares = 3
self.failUnlessEqual(len(block_hash_tree), 1) # very small tree
self.failUnlessEqual(IV, "SALT"*4)
self.failUnlessEqual(len(share_data), len("%07d" % 1))
- self.failUnlessEqual(enc_privkey, fn.get_encprivkey())
+ self.failUnlessEqual(enc_privkey, self._fn.get_encprivkey())
d.addCallback(_generated)
return d
# later.
self.CONTENTS = "New contents go here" * 1000
num_peers = 20
- self._client = FakeClient(num_peers)
- self._storage = self._client._storage
- d = self._client.create_mutable_file(self.CONTENTS)
+ self._storage = FakeStorage()
+ self._nodemaker = make_nodemaker(self._storage)
+ self._storage_broker = self._nodemaker.storage_broker
+ d = self._nodemaker.create_mutable_file(self.CONTENTS)
def _created(node):
self._fn = node
- self._fn2 = self._client.create_node_from_uri(node.get_uri())
+ self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
d.addCallback(_created)
return d
+
def publish_multiple(self):
self.CONTENTS = ["Contents 0",
"Contents 1",
"Contents 3b"]
self._copied_shares = {}
num_peers = 20
- self._client = FakeClient(num_peers)
- self._storage = self._client._storage
- d = self._client.create_mutable_file(self.CONTENTS[0]) # seqnum=1
+ self._storage = FakeStorage()
+ self._nodemaker = make_nodemaker(self._storage)
+ d = self._nodemaker.create_mutable_file(self.CONTENTS[0]) # seqnum=1
def _created(node):
self._fn = node
# now create multiple versions of the same file, and accumulate
return d
def _copy_shares(self, ignored, index):
- shares = self._client._storage._peers
+ shares = self._storage._peers
# we need a deep copy
new_shares = {}
for peerid in shares:
# versionmap maps shnums to which version (0,1,2,3,4) we want the
# share to be at. Any shnum which is left out of the map will stay at
# its current version.
- shares = self._client._storage._peers
+ shares = self._storage._peers
oldshares = self._copied_shares
for peerid in shares:
for shnum in shares[peerid]:
def setUp(self):
return self.publish_one()
- def make_servermap(self, mode=MODE_CHECK, fn=None):
+ def make_servermap(self, mode=MODE_CHECK, fn=None, sb=None):
if fn is None:
fn = self._fn
- smu = ServermapUpdater(fn, Monitor(), ServerMap(), mode)
+ if sb is None:
+ sb = self._storage_broker
+ smu = ServermapUpdater(fn, sb, Monitor(),
+ ServerMap(), mode)
d = smu.update()
return d
def update_servermap(self, oldmap, mode=MODE_CHECK):
- smu = ServermapUpdater(self._fn, Monitor(), oldmap, mode)
+ smu = ServermapUpdater(self._fn, self._storage_broker, Monitor(),
+ oldmap, mode)
d = smu.update()
return d
# create a new file, which is large enough to knock the privkey out
# of the early part of the file
LARGE = "These are Larger contents" * 200 # about 5KB
- d.addCallback(lambda res: self._client.create_mutable_file(LARGE))
+ d.addCallback(lambda res: self._nodemaker.create_mutable_file(LARGE))
def _created(large_fn):
- large_fn2 = self._client.create_node_from_uri(large_fn.get_uri())
+ large_fn2 = self._nodemaker.create_from_cap(large_fn.get_uri())
return self.make_servermap(MODE_WRITE, large_fn2)
d.addCallback(_created)
d.addCallback(lambda sm: self.failUnlessOneRecoverable(sm, 10))
self.failUnlessEqual(len(sm.shares_available()), 0)
def test_no_shares(self):
- self._client._storage._peers = {} # delete all shares
+ self._storage._peers = {} # delete all shares
ms = self.make_servermap
d = defer.succeed(None)
return sm
def test_not_quite_enough_shares(self):
- s = self._client._storage
+ s = self._storage
ms = self.make_servermap
num_shares = len(s._peers)
for peerid in s._peers:
def setUp(self):
return self.publish_one()
- def make_servermap(self, mode=MODE_READ, oldmap=None):
+ def make_servermap(self, mode=MODE_READ, oldmap=None, sb=None):
if oldmap is None:
oldmap = ServerMap()
- smu = ServermapUpdater(self._fn, Monitor(), oldmap, mode)
+ if sb is None:
+ sb = self._storage_broker
+ smu = ServermapUpdater(self._fn, sb, Monitor(), oldmap, mode)
d = smu.update()
return d
return d
def test_no_servers(self):
- c2 = FakeClient(0)
- self._fn._client = c2
+ sb2 = make_storagebroker(num_peers=0)
# if there are no servers, then a MODE_READ servermap should come
# back empty
- d = self.make_servermap()
+ d = self.make_servermap(sb=sb2)
def _check_servermap(servermap):
self.failUnlessEqual(servermap.best_recoverable_version(), None)
self.failIf(servermap.recoverable_versions())
test_no_servers.timeout = 15
def test_no_servers_download(self):
- c2 = FakeClient(0)
- self._fn._client = c2
+ sb2 = make_storagebroker(num_peers=0)
+ self._fn._storage_broker = sb2
d = self.shouldFail(UnrecoverableFileError,
"test_no_servers_download",
"no recoverable versions",
# anybody should not prevent a subsequent download from working.
# This isn't quite the webapi-driven test that #463 wants, but it
# should be close enough.
- self._fn._client = self._client
+ self._fn._storage_broker = self._storage_broker
return self._fn.download_best_version()
def _retrieved(new_contents):
self.failUnlessEqual(new_contents, self.CONTENTS)
d.addCallback(_check_results)
return d
+class DevNullDictionary(dict):
+ def __setitem__(self, key, value):
+ return
+
class MultipleEncodings(unittest.TestCase):
def setUp(self):
self.CONTENTS = "New contents go here"
- num_peers = 20
- self._client = FakeClient(num_peers)
- self._storage = self._client._storage
- d = self._client.create_mutable_file(self.CONTENTS)
+ self._storage = FakeStorage()
+ self._nodemaker = make_nodemaker(self._storage, num_peers=20)
+ self._storage_broker = self._nodemaker.storage_broker
+ d = self._nodemaker.create_mutable_file(self.CONTENTS)
def _created(node):
self._fn = node
d.addCallback(_created)
def _encode(self, k, n, data):
# encode 'data' into a peerid->shares dict.
- fn2 = FastMutableFileNode(self._client)
- # init_from_uri populates _uri, _writekey, _readkey, _storage_index,
- # and _fingerprint
fn = self._fn
- fn2.init_from_uri(fn.get_uri())
+ # disable the nodecache, since for these tests we explicitly need
+ # multiple nodes pointing at the same file
+ self._nodemaker._node_cache = DevNullDictionary()
+ fn2 = self._nodemaker.create_from_cap(fn.get_uri())
# then we copy over other fields that are normally fetched from the
# existing shares
fn2._pubkey = fn._pubkey
fn2._required_shares = k
fn2._total_shares = n
- s = self._client._storage
+ s = self._storage
s._peers = {} # clear existing storage
- p2 = Publish(fn2, None)
+ p2 = Publish(fn2, self._storage_broker, None)
d = p2.publish(data)
def _published(res):
shares = s._peers
def make_servermap(self, mode=MODE_READ, oldmap=None):
if oldmap is None:
oldmap = ServerMap()
- smu = ServermapUpdater(self._fn, Monitor(), oldmap, mode)
+ smu = ServermapUpdater(self._fn, self._storage_broker, Monitor(),
+ oldmap, mode)
d = smu.update()
return d
# we make a retrieval object that doesn't know what encoding
# parameters to use
- fn3 = FastMutableFileNode(self._client)
- fn3.init_from_uri(self._fn.get_uri())
+ fn3 = self._nodemaker.create_from_cap(self._fn.get_uri())
# now we upload a file through fn1, and grab its shares
d = self._encode(3, 10, contents1)
places = [2, 2, 3, 2, 1, 1, 1, 2]
sharemap = {}
- sb = self._client.get_storage_broker()
+ sb = self._storage_broker
for peerid in sorted(sb.get_all_serverids()):
peerid_s = shortnodeid_b2a(peerid)
which = places[shnum]
else:
which = "x"
- self._client._storage._peers[peerid] = peers = {}
+ self._storage._peers[peerid] = peers = {}
in_1 = shnum in self._shares1[peerid]
in_2 = shnum in self._shares2.get(peerid, {})
in_3 = shnum in self._shares3.get(peerid, {})
# now sort the sequence so that share 0 is returned first
new_sequence = [sharemap[shnum]
for shnum in sorted(sharemap.keys())]
- self._client._storage._sequence = new_sequence
+ self._storage._sequence = new_sequence
log.msg("merge done")
d.addCallback(_merge)
d.addCallback(lambda res: fn3.download_best_version())
ucwe = UncoordinatedWriteError()
self.failUnless("UncoordinatedWriteError" in repr(ucwe), repr(ucwe))
-# we can't do this test with a FakeClient, since it uses FakeStorageServer
-# instances which always succeed. So we need a less-fake one.
-
-class IntentionalError(Exception):
- pass
-
-class LocalWrapper:
- def __init__(self, original):
- self.original = original
- self.broken = False
- self.post_call_notifier = None
- def callRemote(self, methname, *args, **kwargs):
- def _call():
- if self.broken:
- raise IntentionalError("I was asked to break")
- meth = getattr(self.original, "remote_" + methname)
- return meth(*args, **kwargs)
- d = fireEventually()
- d.addCallback(lambda res: _call())
- if self.post_call_notifier:
- d.addCallback(self.post_call_notifier, methname)
- return d
-
-class LessFakeClient(FakeClient):
-
- def __init__(self, basedir, num_peers=10):
- self._num_peers = num_peers
- peerids = [tagged_hash("peerid", "%d" % i)[:20]
- for i in range(self._num_peers)]
- self.storage_broker = StorageFarmBroker(None, True)
- for peerid in peerids:
- peerdir = os.path.join(basedir, idlib.shortnodeid_b2a(peerid))
- make_dirs(peerdir)
- ss = StorageServer(peerdir, peerid)
- lw = LocalWrapper(ss)
- self.storage_broker.test_add_server(peerid, lw)
- self.nodeid = "fakenodeid"
-
-
-class Problems(unittest.TestCase, testutil.ShouldFailMixin):
+class SameKeyGenerator:
+ def __init__(self, pubkey, privkey):
+ self.pubkey = pubkey
+ self.privkey = privkey
+ def generate(self, keysize=None):
+ return defer.succeed( (self.pubkey, self.privkey) )
+
+class FirstServerGetsKilled:
+ done = False
+ def notify(self, retval, wrapper, methname):
+ if not self.done:
+ wrapper.broken = True
+ self.done = True
+ return retval
+
+class FirstServerGetsDeleted:
+ def __init__(self):
+ self.done = False
+ self.silenced = None
+ def notify(self, retval, wrapper, methname):
+ if not self.done:
+ # this query will work, but later queries should think the share
+ # has been deleted
+ self.done = True
+ self.silenced = wrapper
+ return retval
+ if wrapper == self.silenced:
+ assert methname == "slot_testv_and_readv_and_writev"
+ return (True, {})
+ return retval
+
+class Problems(GridTestMixin, unittest.TestCase, testutil.ShouldFailMixin):
def test_publish_surprise(self):
- basedir = os.path.join("mutable/CollidingWrites/test_surprise")
- self.client = LessFakeClient(basedir)
- d = self.client.create_mutable_file("contents 1")
+ self.basedir = "mutable/Problems/test_publish_surprise"
+ self.set_up_grid()
+ nm = self.g.clients[0].nodemaker
+ d = nm.create_mutable_file("contents 1")
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
return d
def test_retrieve_surprise(self):
- basedir = os.path.join("mutable/CollidingWrites/test_retrieve")
- self.client = LessFakeClient(basedir)
- d = self.client.create_mutable_file("contents 1")
+ self.basedir = "mutable/Problems/test_retrieve_surprise"
+ self.set_up_grid()
+ nm = self.g.clients[0].nodemaker
+ d = nm.create_mutable_file("contents 1")
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_READ))
# upload using the old servermap. The last upload should fail with an
# UncoordinatedWriteError, because of the shares that didn't appear
# in the servermap.
- basedir = os.path.join("mutable/CollidingWrites/test_unexpexted_shares")
- self.client = LessFakeClient(basedir)
- d = self.client.create_mutable_file("contents 1")
+ self.basedir = "mutable/Problems/test_unexpected_shares"
+ self.set_up_grid()
+ nm = self.g.clients[0].nodemaker
+ d = nm.create_mutable_file("contents 1")
def _created(n):
d = defer.succeed(None)
d.addCallback(lambda res: n.get_servermap(MODE_WRITE))
self.old_map = smap
# now shut down one of the servers
peer0 = list(smap.make_sharemap()[0])[0]
- self.client.debug_remove_connection(peer0)
+ self.g.remove_server(peer0)
# then modify the file, leaving the old map untouched
log.msg("starting winning write")
return n.overwrite("contents 2")
# Break one server, then create the file: the initial publish should
# complete with an alternate server. Breaking a second server should
# not prevent an update from succeeding either.
- basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
- self.client = LessFakeClient(basedir, 20)
+ self.basedir = "mutable/Problems/test_bad_server"
+ self.set_up_grid()
+ nm = self.g.clients[0].nodemaker
+
# to make sure that one of the initial peers is broken, we have to
- # get creative. We create the keys, so we can figure out the storage
- # index, but we hold off on doing the initial publish until we've
- # broken the server on which the first share wants to be stored.
- n = FastMutableFileNode(self.client)
- d = defer.succeed(None)
- d.addCallback(n._generate_pubprivkeys, keysize=522)
- d.addCallback(n._generated)
+ # get creative. We create an RSA key and compute its storage-index.
+ # Then we make a KeyGenerator that always returns that one key, and
+ # use it to create the mutable file. This will get easier when we can
+ # use #467 static-server-selection to disable permutation and force
+ # the choice of server for share[0].
+
+ d = nm.key_generator.generate(522)
+ def _got_key( (pubkey, privkey) ):
+ nm.key_generator = SameKeyGenerator(pubkey, privkey)
+ pubkey_s = pubkey.serialize()
+ privkey_s = privkey.serialize()
+ u = uri.WriteableSSKFileURI(ssk_writekey_hash(privkey_s),
+ ssk_pubkey_fingerprint_hash(pubkey_s))
+ self._storage_index = u.storage_index
+ d.addCallback(_got_key)
def _break_peer0(res):
- si = n.get_storage_index()
- peerlist = self.client.storage_broker.get_servers_for_index(si)
+ si = self._storage_index
+ peerlist = nm.storage_broker.get_servers_for_index(si)
peerid0, connection0 = peerlist[0]
peerid1, connection1 = peerlist[1]
connection0.broken = True
self.connection1 = connection1
d.addCallback(_break_peer0)
- # now let the initial publish finally happen
- d.addCallback(lambda res: n._upload("contents 1", None))
+ # now "create" the file, using the pre-established key, and let the
+ # initial publish finally happen
+ d.addCallback(lambda res: nm.create_mutable_file("contents 1"))
# that ought to work
- d.addCallback(lambda res: n.download_best_version())
- d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
- # now break the second peer
- def _break_peer1(res):
- self.connection1.broken = True
- d.addCallback(_break_peer1)
- d.addCallback(lambda res: n.overwrite("contents 2"))
- # that ought to work too
- d.addCallback(lambda res: n.download_best_version())
- d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
- def _explain_error(f):
- print f
- if f.check(NotEnoughServersError):
- print "first_error:", f.value.first_error
- return f
- d.addErrback(_explain_error)
+ def _got_node(n):
+ d = n.download_best_version()
+ d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
+ # now break the second peer
+ def _break_peer1(res):
+ self.connection1.broken = True
+ d.addCallback(_break_peer1)
+ d.addCallback(lambda res: n.overwrite("contents 2"))
+ # that ought to work too
+ d.addCallback(lambda res: n.download_best_version())
+ d.addCallback(lambda res: self.failUnlessEqual(res, "contents 2"))
+ def _explain_error(f):
+ print f
+ if f.check(NotEnoughServersError):
+ print "first_error:", f.value.first_error
+ return f
+ d.addErrback(_explain_error)
+ return d
+ d.addCallback(_got_node)
return d
def test_bad_server_overlap(self):
# Break one server, then create the file: the initial publish should
# complete with an alternate server. Breaking a second server should
# not prevent an update from succeeding either.
- basedir = os.path.join("mutable/CollidingWrites/test_bad_server")
- self.client = LessFakeClient(basedir, 10)
- sb = self.client.get_storage_broker()
+ self.basedir = "mutable/Problems/test_bad_server_overlap"
+ self.set_up_grid()
+ nm = self.g.clients[0].nodemaker
+ sb = nm.storage_broker
- peerids = list(sb.get_all_serverids())
- self.client.debug_break_connection(peerids[0])
+ peerids = [serverid for (serverid,ss) in sb.get_all_servers()]
+ self.g.break_server(peerids[0])
- d = self.client.create_mutable_file("contents 1")
+ d = nm.create_mutable_file("contents 1")
def _created(n):
d = n.download_best_version()
d.addCallback(lambda res: self.failUnlessEqual(res, "contents 1"))
# now break one of the remaining servers
def _break_second_server(res):
- self.client.debug_break_connection(peerids[1])
+ self.g.break_server(peerids[1])
d.addCallback(_break_second_server)
d.addCallback(lambda res: n.overwrite("contents 2"))
# that ought to work too
def test_publish_all_servers_bad(self):
# Break all servers: the publish should fail
- basedir = os.path.join("mutable/CollidingWrites/publish_all_servers_bad")
- self.client = LessFakeClient(basedir, 20)
- sb = self.client.get_storage_broker()
- for peerid in sb.get_all_serverids():
- self.client.debug_break_connection(peerid)
+ self.basedir = "mutable/Problems/test_publish_all_servers_bad"
+ self.set_up_grid()
+ nm = self.g.clients[0].nodemaker
+ for (serverid,ss) in nm.storage_broker.get_all_servers():
+ ss.broken = True
+
d = self.shouldFail(NotEnoughServersError,
"test_publish_all_servers_bad",
"Ran out of non-bad servers",
- self.client.create_mutable_file, "contents")
+ nm.create_mutable_file, "contents")
return d
def test_publish_no_servers(self):
# no servers at all: the publish should fail
- basedir = os.path.join("mutable/CollidingWrites/publish_no_servers")
- self.client = LessFakeClient(basedir, 0)
+ self.basedir = "mutable/Problems/test_publish_no_servers"
+ self.set_up_grid(num_servers=0)
+ nm = self.g.clients[0].nodemaker
+
d = self.shouldFail(NotEnoughServersError,
"test_publish_no_servers",
"Ran out of non-bad servers",
- self.client.create_mutable_file, "contents")
+ nm.create_mutable_file, "contents")
return d
test_publish_no_servers.timeout = 30
def test_privkey_query_error(self):
# when a servermap is updated with MODE_WRITE, it tries to get the
# privkey. Something might go wrong during this query attempt.
- self.client = FakeClient(20)
+ # Exercise the code in _privkey_query_failed which tries to handle
+ # such an error.
+ self.basedir = "mutable/Problems/test_privkey_query_error"
+ self.set_up_grid(num_servers=20)
+ nm = self.g.clients[0].nodemaker
+ nm._node_cache = DevNullDictionary() # disable the nodecache
+
# we need some contents that are large enough to push the privkey out
# of the early part of the file
- LARGE = "These are Larger contents" * 200 # about 5KB
- d = self.client.create_mutable_file(LARGE)
+ LARGE = "These are Larger contents" * 2000 # about 50KB
+ d = nm.create_mutable_file(LARGE)
def _created(n):
self.uri = n.get_uri()
- self.n2 = self.client.create_node_from_uri(self.uri)
- # we start by doing a map update to figure out which is the first
- # server.
- return n.get_servermap(MODE_WRITE)
+ self.n2 = nm.create_from_cap(self.uri)
+
+ # When a mapupdate is performed on a node that doesn't yet know
+ # the privkey, a short read is sent to a batch of servers, to get
+ # the verinfo and (hopefully, if the file is short enough) the
+ # encprivkey. Our file is too large to let this first read
+ # contain the encprivkey. Each non-encprivkey-bearing response
+ # that arrives (until the node gets the encprivkey) will trigger
+ # a second read to specifically read the encprivkey.
+ #
+ # So, to exercise this case:
+ # 1. notice which server gets a read() call first
+ # 2. tell that server to start throwing errors
+ killer = FirstServerGetsKilled()
+ for (serverid,ss) in nm.storage_broker.get_all_servers():
+ ss.post_call_notifier = killer.notify
d.addCallback(_created)
- d.addCallback(lambda res: fireEventually(res))
- def _got_smap1(smap):
- peer0 = list(smap.make_sharemap()[0])[0]
- # we tell the server to respond to this peer first, so that it
- # will be asked for the privkey first
- self.client._storage._sequence = [peer0]
- # now we make the peer fail their second query
- self.client._storage._special_answers[peer0] = ["normal", "fail"]
- d.addCallback(_got_smap1)
+
# now we update a servermap from a new node (which doesn't have the
- # privkey yet, forcing it to use a separate privkey query). Each
- # query response will trigger a privkey query, and since we're using
- # _sequence to make the peer0 response come back first, we'll send it
- # a privkey query first, and _sequence will again ensure that the
- # peer0 query will also come back before the others, and then
- # _special_answers will make sure that the query raises an exception.
- # The whole point of these hijinks is to exercise the code in
- # _privkey_query_failed. Note that the map-update will succeed, since
- # we'll just get a copy from one of the other shares.
+ # privkey yet, forcing it to use a separate privkey query). Note that
+ # the map-update will succeed, since we'll just get a copy from one
+ # of the other shares.
d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
- # Using FakeStorage._sequence means there will be read requests still
- # floating around.. wait for them to retire
- def _cancel_timer(res):
- if self.client._storage._pending_timer:
- self.client._storage._pending_timer.cancel()
- return res
- d.addBoth(_cancel_timer)
+
return d
def test_privkey_query_missing(self):
# like test_privkey_query_error, but the shares are deleted by the
# second query, instead of raising an exception.
- self.client = FakeClient(20)
- LARGE = "These are Larger contents" * 200 # about 5KB
- d = self.client.create_mutable_file(LARGE)
+ self.basedir = "mutable/Problems/test_privkey_query_missing"
+ self.set_up_grid(num_servers=20)
+ nm = self.g.clients[0].nodemaker
+ LARGE = "These are Larger contents" * 2000 # about 50KB
+ nm._node_cache = DevNullDictionary() # disable the nodecache
+
+ d = nm.create_mutable_file(LARGE)
def _created(n):
self.uri = n.get_uri()
- self.n2 = self.client.create_node_from_uri(self.uri)
- return n.get_servermap(MODE_WRITE)
+ self.n2 = nm.create_from_cap(self.uri)
+ deleter = FirstServerGetsDeleted()
+ for (serverid,ss) in nm.storage_broker.get_all_servers():
+ ss.post_call_notifier = deleter.notify
d.addCallback(_created)
- d.addCallback(lambda res: fireEventually(res))
- def _got_smap1(smap):
- peer0 = list(smap.make_sharemap()[0])[0]
- self.client._storage._sequence = [peer0]
- self.client._storage._special_answers[peer0] = ["normal", "none"]
- d.addCallback(_got_smap1)
d.addCallback(lambda res: self.n2.get_servermap(MODE_WRITE))
- def _cancel_timer(res):
- if self.client._storage._pending_timer:
- self.client._storage._pending_timer.cancel()
- return res
- d.addBoth(_cancel_timer)
return d
def _got_status(res):
# find an interesting upload and download to look at. LIT files
# are not interesting.
- for ds in self.clients[0].list_all_download_statuses():
+ h = self.clients[0].get_history()
+ for ds in h.list_all_download_statuses():
if ds.get_size() > 200:
self._down_status = ds.get_counter()
- for us in self.clients[0].list_all_upload_statuses():
+ for us in h.list_all_upload_statuses():
if us.get_size() > 200:
self._up_status = us.get_counter()
- rs = list(self.clients[0].list_all_retrieve_statuses())[0]
+ rs = list(h.list_all_retrieve_statuses())[0]
self._retrieve_status = rs.get_counter()
- ps = list(self.clients[0].list_all_publish_statuses())[0]
+ ps = list(h.list_all_publish_statuses())[0]
self._publish_status = ps.get_counter()
- us = list(self.clients[0].list_all_mapupdate_statuses())[0]
+ us = list(h.list_all_mapupdate_statuses())[0]
self._update_status = us.get_counter()
# and that there are some upload- and download- status pages
from foolscap.api import fireEventually
import allmydata # for __full_version__
-from allmydata import uri, monitor
+from allmydata import uri, monitor, client
from allmydata.immutable import upload
from allmydata.interfaces import IFileURI, FileTooLargeError, NoSharesError, \
NotEnoughSharesError
return self.DEFAULT_ENCODING_PARAMETERS
def get_storage_broker(self):
return self.storage_broker
-
- def get_renewal_secret(self):
- return ""
- def get_cancel_secret(self):
- return ""
+ _secret_holder = client.SecretHolder("lease secret")
class GotTooFarError(Exception):
pass
from allmydata.storage.shares import get_share_file
from allmydata.storage_client import StorageFarmBroker
from allmydata.immutable import upload, download
+from allmydata.nodemaker import NodeMaker
from allmydata.unknown import UnknownNode
from allmydata.web import status, common
from allmydata.scripts.debug import CorruptShareOptions, corrupt_share
from allmydata.util import fileutil, base32
-from allmydata.util.assertutil import precondition
-from allmydata.test.common import FakeDirectoryNode, FakeCHKFileNode, \
- FakeMutableFileNode, create_chk_filenode, WebErrorMixin, ShouldFailMixin
-from allmydata.interfaces import IURI, IDirectoryURI, IReadonlyDirectoryURI, \
- IFileURI, IMutableFileURI, IMutableFileNode, UnhandledCapTypeError
+from allmydata.test.common import FakeCHKFileNode, FakeMutableFileNode, \
+ create_chk_filenode, WebErrorMixin, ShouldFailMixin
+from allmydata.interfaces import IMutableFileNode
from allmydata.mutable import servermap, publish, retrieve
import common_util as testutil
from allmydata.test.no_network import GridTestMixin
-
from allmydata.test.common_web import HTTPClientGETFactory, \
HTTPClientHEADFactory
stats = {'stats': {}, 'counters': {}}
return stats
+class FakeNodeMaker(NodeMaker):
+ def _create_lit(self, cap):
+ return FakeCHKFileNode(cap)
+ def _create_immutable(self, cap):
+ return FakeCHKFileNode(cap)
+ def _create_mutable(self, cap):
+ return FakeMutableFileNode(None, None, None, None).init_from_uri(cap)
+ def create_mutable_file(self, contents="", keysize=None):
+ n = FakeMutableFileNode(None, None, None, None)
+ return n.create(contents)
+
+class FakeUploader:
+ def upload(self, uploadable):
+ d = uploadable.get_size()
+ d.addCallback(lambda size: uploadable.read(size))
+ def _got_data(datav):
+ data = "".join(datav)
+ n = create_chk_filenode(data)
+ results = upload.UploadResults()
+ results.uri = n.get_uri()
+ return results
+ d.addCallback(_got_data)
+ return d
+
+class FakeHistory:
+ _all_upload_status = [upload.UploadStatus()]
+ _all_download_status = [download.DownloadStatus()]
+ _all_mapupdate_statuses = [servermap.UpdateStatus()]
+ _all_publish_statuses = [publish.PublishStatus()]
+ _all_retrieve_statuses = [retrieve.RetrieveStatus()]
+
+ def list_all_upload_statuses(self):
+ return self._all_upload_status
+ def list_all_download_statuses(self):
+ return self._all_download_status
+ def list_all_mapupdate_statuses(self):
+ return self._all_mapupdate_statuses
+ def list_all_publish_statuses(self):
+ return self._all_publish_statuses
+ def list_all_retrieve_statuses(self):
+ return self._all_retrieve_statuses
+ def list_all_helper_statuses(self):
+ return []
+
class FakeClient(service.MultiService):
+ def __init__(self):
+ service.MultiService.__init__(self)
+ self.uploader = FakeUploader()
+ self.nodemaker = FakeNodeMaker(None, None, None,
+ self.uploader, None, None,
+ None, None)
+
nodeid = "fake_nodeid"
nickname = "fake_nickname"
basedir = "fake_basedir"
}
introducer_furl = "None"
- _all_upload_status = [upload.UploadStatus()]
- _all_download_status = [download.DownloadStatus()]
- _all_mapupdate_statuses = [servermap.UpdateStatus()]
- _all_publish_statuses = [publish.PublishStatus()]
- _all_retrieve_statuses = [retrieve.RetrieveStatus()]
convergence = "some random string"
stats_provider = FakeStatsProvider()
storage_broker = StorageFarmBroker(None, permute_peers=True)
def get_storage_broker(self):
return self.storage_broker
+ _secret_holder = None
+ def get_encoding_parameters(self):
+ return {"k": 3, "n": 10}
+ def get_history(self):
+ return FakeHistory()
- def create_node_from_uri(self, auri, readcap=None):
- if not auri:
- auri = readcap
- precondition(isinstance(auri, str), auri)
- u = uri.from_string(auri)
- if (IDirectoryURI.providedBy(u) or IReadonlyDirectoryURI.providedBy(u)):
- return FakeDirectoryNode(self).init_from_uri(u)
- if IFileURI.providedBy(u):
- return FakeCHKFileNode(u, self)
- if IMutableFileURI.providedBy(u):
- return FakeMutableFileNode(self).init_from_uri(u)
- raise UnhandledCapTypeError("cap '%s' is recognized, but has no Node" % auri)
+ def create_node_from_uri(self, writecap, readcap=None):
+ return self.nodemaker.create_from_cap(writecap, readcap)
def create_empty_dirnode(self):
- n = FakeDirectoryNode(self)
- d = n.create()
- d.addCallback(lambda res: n)
- return d
+ return self.nodemaker.create_new_mutable_directory()
MUTABLE_SIZELIMIT = FakeMutableFileNode.MUTABLE_SIZELIMIT
def create_mutable_file(self, contents=""):
- n = FakeMutableFileNode(self)
- return n.create(contents)
+ return self.nodemaker.create_mutable_file(contents)
def upload(self, uploadable):
- d = uploadable.get_size()
- d.addCallback(lambda size: uploadable.read(size))
- def _got_data(datav):
- data = "".join(datav)
- n = create_chk_filenode(self, data)
- results = upload.UploadResults()
- results.uri = n.get_uri()
- return results
- d.addCallback(_got_data)
- return d
-
- def list_all_upload_statuses(self):
- return self._all_upload_status
- def list_all_download_statuses(self):
- return self._all_download_status
- def list_all_mapupdate_statuses(self):
- return self._all_mapupdate_statuses
- def list_all_publish_statuses(self):
- return self._all_publish_statuses
- def list_all_retrieve_statuses(self):
- return self._all_retrieve_statuses
- def list_all_helper_statuses(self):
- return []
+ return self.uploader.upload(uploadable)
class WebMixin(object):
def setUp(self):
def makefile(self, number):
contents = "contents of file %s\n" % number
- n = create_chk_filenode(self.s, contents)
+ n = create_chk_filenode(contents)
return contents, n, n.get_uri()
def tearDown(self):
return d
def test_status(self):
- dl_num = self.s.list_all_download_statuses()[0].get_counter()
- ul_num = self.s.list_all_upload_statuses()[0].get_counter()
- mu_num = self.s.list_all_mapupdate_statuses()[0].get_counter()
- pub_num = self.s.list_all_publish_statuses()[0].get_counter()
- ret_num = self.s.list_all_retrieve_statuses()[0].get_counter()
+ h = self.s.get_history()
+ dl_num = h.list_all_download_statuses()[0].get_counter()
+ ul_num = h.list_all_upload_statuses()[0].get_counter()
+ mu_num = h.list_all_mapupdate_statuses()[0].get_counter()
+ pub_num = h.list_all_publish_statuses()[0].get_counter()
+ ret_num = h.list_all_retrieve_statuses()[0].get_counter()
d = self.GET("/status", followRedirect=True)
def _check(res):
self.failUnless('Upload and Download Status' in res, res)
base = "/file/%s" % urllib.quote(verifier_cap)
# client.create_node_from_uri() can't handle verify-caps
d = self.shouldFail2(error.Error, "GET_unhandled_URI_named",
- "400 Bad Request",
- "is not a valid file- or directory- cap",
+ "400 Bad Request", "is not a file-cap",
self.GET, base)
return d
# client.create_node_from_uri() can't handle verify-caps
d = self.shouldFail2(error.Error, "test_GET_unhandled_URI",
"400 Bad Request",
- "is not a valid file- or directory- cap",
+ "GET unknown URI type: can only do t=info",
self.GET, base)
return d
def test_PUT_overwrite_only_files(self):
# create a directory, put a file in that directory.
- contents, n, uri = self.makefile(8)
+ contents, n, filecap = self.makefile(8)
d = self.PUT(self.public_url + "/foo/dir?t=mkdir", "")
d.addCallback(lambda res:
self.PUT(self.public_url + "/foo/dir/file1.txt",
# (this should work)
d.addCallback(lambda res:
self.PUT(self.public_url + "/foo/dir/file1.txt?t=uri&replace=only-files",
- uri))
+ filecap))
d.addCallback(lambda res:
self.shouldFail2(error.Error, "PUT_bad_t", "409 Conflict",
"There was already a child by that name, and you asked me "
"to not replace it",
self.PUT, self.public_url + "/foo/dir?t=uri&replace=only-files",
- uri))
+ filecap))
return d
def test_PUT_NEWFILEURL(self):
def test_POST_upload_no_link_mutable(self):
d = self.POST("/uri", t="upload", mutable="true",
file=("new.txt", self.NEWFILE_CONTENTS))
- def _check(new_uri):
- new_uri = new_uri.strip()
- self.new_uri = new_uri
- u = IURI(new_uri)
- self.failUnless(IMutableFileURI.providedBy(u))
+ def _check(filecap):
+ filecap = filecap.strip()
+ self.failUnless(filecap.startswith("URI:SSK:"), filecap)
+ self.filecap = filecap
+ u = uri.WriteableSSKFileURI.init_from_string(filecap)
self.failUnless(u.storage_index in FakeMutableFileNode.all_contents)
- n = self.s.create_node_from_uri(new_uri)
+ n = self.s.create_node_from_uri(filecap)
return n.download_best_version()
d.addCallback(_check)
def _check2(data):
self.failUnlessEqual(data, self.NEWFILE_CONTENTS)
- return self.GET("/uri/%s" % urllib.quote(self.new_uri))
+ return self.GET("/uri/%s" % urllib.quote(self.filecap))
d.addCallback(_check2)
def _check3(data):
self.failUnlessEqual(data, self.NEWFILE_CONTENTS)
- return self.GET("/file/%s" % urllib.quote(self.new_uri))
+ return self.GET("/file/%s" % urllib.quote(self.filecap))
d.addCallback(_check3)
def _check4(data):
self.failUnlessEqual(data, self.NEWFILE_CONTENTS)
def test_POST_upload_mutable_toobig(self):
d = self.shouldFail2(error.Error,
- "test_POST_upload_no_link_mutable_toobig",
+ "test_POST_upload_mutable_toobig",
"413 Request Entity Too Large",
"SDMF is limited to one segment, and 10001 > 10000",
self.POST,
def test_PUT_NEWFILE_URI_mutable(self):
file_contents = "New file contents here\n"
d = self.PUT("/uri?mutable=true", file_contents)
- def _check_mutable(uri):
- uri = uri.strip()
- u = IURI(uri)
- self.failUnless(IMutableFileURI.providedBy(u))
+ def _check1(filecap):
+ filecap = filecap.strip()
+ self.failUnless(filecap.startswith("URI:SSK:"), filecap)
+ self.filecap = filecap
+ u = uri.WriteableSSKFileURI.init_from_string(filecap)
self.failUnless(u.storage_index in FakeMutableFileNode.all_contents)
- n = self.s.create_node_from_uri(uri)
+ n = self.s.create_node_from_uri(filecap)
return n.download_best_version()
- d.addCallback(_check_mutable)
- def _check2_mutable(data):
+ d.addCallback(_check1)
+ def _check2(data):
self.failUnlessEqual(data, file_contents)
- d.addCallback(_check2_mutable)
- return d
-
- def _check(uri):
- self.failUnless(uri.to_string() in FakeCHKFileNode.all_contents)
- self.failUnlessEqual(FakeCHKFileNode.all_contents[uri.to_string()],
- file_contents)
- return self.GET("/uri/%s" % uri)
- d.addCallback(_check)
- def _check2(res):
- self.failUnlessEqual(res, file_contents)
+ return self.GET("/uri/%s" % urllib.quote(self.filecap))
d.addCallback(_check2)
+ def _check3(res):
+ self.failUnlessEqual(res, file_contents)
+ d.addCallback(_check3)
return d
def test_PUT_mkdir(self):
return DirectoryURIVerifier.init_from_string(s)
return UnknownURI(s)
-registerAdapter(from_string, str, IURI)
-
def is_uri(s):
try:
uri = from_string(s)
t = get_arg(req, "t", "").strip()
if t == "info":
return MoreInfo(self.node)
- raise WebError("GET unknown: can only do t=info, not t=%s" % t)
+ raise WebError("GET unknown URI type: can only do t=info, not t=%s" % t)
try:
node = self.client.create_node_from_uri(name)
except (TypeError, UnhandledCapTypeError, AssertionError):
+ # I think this can no longer be reached
raise WebError("'%s' is not a valid file- or directory- cap"
% name)
if not IFileNode.providedBy(node):
self.child_file = FileHandler(client)
self.child_named = FileHandler(client)
- self.child_status = status.Status(client) # TODO: use client.history
+ self.child_status = status.Status(client.get_history())
self.child_statistics = status.Statistics(client.stats_provider)
def child_helper_status(self, ctx):
docFactory = getxmlfile("status.xhtml")
addSlash = True
- def __init__(self, client):
- rend.Page.__init__(self, client)
- self.client = client
+ def __init__(self, history):
+ rend.Page.__init__(self, history)
+ self.history = history
def renderHTTP(self, ctx):
req = inevow.IRequest(ctx)
return simplejson.dumps(data, indent=1) + "\n"
def _get_all_statuses(self):
- c = self.client
- return itertools.chain(c.list_all_upload_statuses(),
- c.list_all_download_statuses(),
- c.list_all_mapupdate_statuses(),
- c.list_all_publish_statuses(),
- c.list_all_retrieve_statuses(),
- c.list_all_helper_statuses(),
+ h = self.history
+ return itertools.chain(h.list_all_upload_statuses(),
+ h.list_all_download_statuses(),
+ h.list_all_mapupdate_statuses(),
+ h.list_all_publish_statuses(),
+ h.list_all_retrieve_statuses(),
+ h.list_all_helper_statuses(),
)
def data_active_operations(self, ctx, data):
return ctx.tag
def childFactory(self, ctx, name):
- client = self.client
+ h = self.history
stype,count_s = name.split("-")
count = int(count_s)
if stype == "up":
- for s in itertools.chain(client.list_all_upload_statuses(),
- client.list_all_helper_statuses()):
+ for s in itertools.chain(h.list_all_upload_statuses(),
+ h.list_all_helper_statuses()):
# immutable-upload helpers use the same status object as a
# regular immutable-upload
if s.get_counter() == count:
return UploadStatusPage(s)
if stype == "down":
- for s in client.list_all_download_statuses():
+ for s in h.list_all_download_statuses():
if s.get_counter() == count:
return DownloadStatusPage(s)
if stype == "mapupdate":
- for s in client.list_all_mapupdate_statuses():
+ for s in h.list_all_mapupdate_statuses():
if s.get_counter() == count:
return MapupdateStatusPage(s)
if stype == "publish":
- for s in client.list_all_publish_statuses():
+ for s in h.list_all_publish_statuses():
if s.get_counter() == count:
return PublishStatusPage(s)
if stype == "retrieve":
- for s in client.list_all_retrieve_statuses():
+ for s in h.list_all_retrieve_statuses():
if s.get_counter() == count:
return RetrieveStatusPage(s)