From: Zooko O'Whielacronx Date: Sun, 7 Dec 2008 15:20:08 +0000 (-0700) Subject: mutable: rename mutable/node.py to mutable/filenode.py and mutable/repair.py to mutab... X-Git-Url: https://git.rkrishnan.org/%5B/%5D%20/file/URI:LIT:krugkidfnzsc4/index.php?a=commitdiff_plain;h=b58875fe435e1fb7e44b5d60caa84dee87f2022c;p=tahoe-lafs%2Ftahoe-lafs.git mutable: rename mutable/node.py to mutable/filenode.py and mutable/repair.py to mutable/repairer.py To be more consistent with the immutable layout that I am working on. --- diff --git a/src/allmydata/client.py b/src/allmydata/client.py index ffbf9fe8..4583fcc2 100644 --- a/src/allmydata/client.py +++ b/src/allmydata/client.py @@ -22,7 +22,7 @@ from allmydata.util import hashutil, base32, pollmixin, cachedir from allmydata.util.abbreviate import parse_abbreviated_size from allmydata.uri import LiteralFileURI from allmydata.dirnode import NewDirectoryNode -from allmydata.mutable.node import MutableFileNode, MutableWatcher +from allmydata.mutable.filenode import MutableFileNode, MutableWatcher from allmydata.stats import StatsProvider from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \ IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, RIStubClient diff --git a/src/allmydata/dirnode.py b/src/allmydata/dirnode.py index 3caec6b4..7417a07e 100644 --- a/src/allmydata/dirnode.py +++ b/src/allmydata/dirnode.py @@ -5,7 +5,7 @@ from zope.interface import implements from twisted.internet import defer import simplejson from allmydata.mutable.common import NotMutableError -from allmydata.mutable.node import MutableFileNode +from allmydata.mutable.filenode import MutableFileNode from allmydata.interfaces import IMutableFileNode, IDirectoryNode,\ IURI, IFileNode, IMutableFileURI, IFilesystemNode, \ ExistingChildError, NoSuchChildError, ICheckable, IDeepCheckable diff --git a/src/allmydata/mutable/filenode.py b/src/allmydata/mutable/filenode.py new file mode 100644 index 00000000..882831b7 --- /dev/null +++ b/src/allmydata/mutable/filenode.py @@ -0,0 +1,484 @@ + +import weakref, random +from twisted.application import service + +from zope.interface import implements +from twisted.internet import defer, reactor +from foolscap.eventual import eventually +from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \ + ICheckable, ICheckerResults, NotEnoughSharesError +from allmydata.util import hashutil, log +from allmydata.util.assertutil import precondition +from allmydata.uri import WriteableSSKFileURI +from allmydata.monitor import Monitor +from pycryptopp.publickey import rsa +from pycryptopp.cipher.aes import AES + +from publish import Publish +from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \ + ResponseCache, UncoordinatedWriteError +from servermap import ServerMap, ServermapUpdater +from retrieve import Retrieve +from checker import MutableChecker, MutableCheckAndRepairer +from repairer import Repairer + + +class BackoffAgent: + # these parameters are copied from foolscap.reconnector, which gets them + # from twisted.internet.protocol.ReconnectingClientFactory + initialDelay = 1.0 + factor = 2.7182818284590451 # (math.e) + jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole + maxRetries = 4 + + def __init__(self): + self._delay = self.initialDelay + self._count = 0 + def delay(self, node, f): + self._count += 1 + if self._count == 4: + return f + self._delay = self._delay * self.factor + self._delay = random.normalvariate(self._delay, + self._delay * self.jitter) + d = defer.Deferred() + reactor.callLater(self._delay, d.callback, None) + return d + +# use client.create_mutable_file() to make one of these + +class MutableFileNode: + implements(IMutableFileNode, ICheckable) + SIGNATURE_KEY_SIZE = 2048 + checker_class = MutableChecker + check_and_repairer_class = MutableCheckAndRepairer + + def __init__(self, client): + self._client = client + self._pubkey = None # filled in upon first read + self._privkey = None # filled in if we're mutable + # we keep track of the last encoding parameters that we use. These + # are updated upon retrieve, and used by publish. If we publish + # without ever reading (i.e. overwrite()), then we use these values. + defaults = client.get_encoding_parameters() + self._required_shares = defaults["k"] + self._total_shares = defaults["n"] + self._sharemap = {} # known shares, shnum-to-[nodeids] + self._cache = ResponseCache() + + # all users of this MutableFileNode go through the serializer. This + # takes advantage of the fact that Deferreds discard the callbacks + # that they're done with, so we can keep using the same Deferred + # forever without consuming more and more memory. + self._serializer = defer.succeed(None) + + def __repr__(self): + if hasattr(self, '_uri'): + return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev()) + else: + return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None) + + def init_from_uri(self, myuri): + # we have the URI, but we have not yet retrieved the public + # verification key, nor things like 'k' or 'N'. If and when someone + # wants to get our contents, we'll pull from shares and fill those + # in. + self._uri = IMutableFileURI(myuri) + if not self._uri.is_readonly(): + self._writekey = self._uri.writekey + self._readkey = self._uri.readkey + self._storage_index = self._uri.storage_index + self._fingerprint = self._uri.fingerprint + # the following values are learned during Retrieval + # self._pubkey + # self._required_shares + # self._total_shares + # and these are needed for Publish. They are filled in by Retrieval + # if possible, otherwise by the first peer that Publish talks to. + self._privkey = None + self._encprivkey = None + return self + + def create(self, initial_contents, keypair_generator=None): + """Call this when the filenode is first created. This will generate + the keys, generate the initial shares, wait until at least numpeers + are connected, allocate shares, and upload the initial + contents. Returns a Deferred that fires (with the MutableFileNode + instance you should use) when it completes. + """ + + d = defer.maybeDeferred(self._generate_pubprivkeys, keypair_generator) + d.addCallback(self._generated) + d.addCallback(lambda res: self._upload(initial_contents, None)) + return d + + def _generated(self, (pubkey, privkey) ): + self._pubkey, self._privkey = pubkey, privkey + pubkey_s = self._pubkey.serialize() + privkey_s = self._privkey.serialize() + self._writekey = hashutil.ssk_writekey_hash(privkey_s) + self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s) + self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s) + self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint) + self._readkey = self._uri.readkey + self._storage_index = self._uri.storage_index + + def _generate_pubprivkeys(self, keypair_generator): + if keypair_generator: + return keypair_generator(self.SIGNATURE_KEY_SIZE) + else: + # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs + signer = rsa.generate(self.SIGNATURE_KEY_SIZE) + verifier = signer.get_verifying_key() + return verifier, signer + + def _encrypt_privkey(self, writekey, privkey): + enc = AES(writekey) + crypttext = enc.process(privkey) + return crypttext + + def _decrypt_privkey(self, enc_privkey): + enc = AES(self._writekey) + privkey = enc.process(enc_privkey) + return privkey + + def _populate_pubkey(self, pubkey): + self._pubkey = pubkey + def _populate_required_shares(self, required_shares): + self._required_shares = required_shares + def _populate_total_shares(self, total_shares): + self._total_shares = total_shares + + def _populate_privkey(self, privkey): + self._privkey = privkey + def _populate_encprivkey(self, encprivkey): + self._encprivkey = encprivkey + + + def get_write_enabler(self, peerid): + assert len(peerid) == 20 + return hashutil.ssk_write_enabler_hash(self._writekey, peerid) + def get_renewal_secret(self, peerid): + assert len(peerid) == 20 + crs = self._client.get_renewal_secret() + frs = hashutil.file_renewal_secret_hash(crs, self._storage_index) + return hashutil.bucket_renewal_secret_hash(frs, peerid) + def get_cancel_secret(self, peerid): + assert len(peerid) == 20 + ccs = self._client.get_cancel_secret() + fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index) + return hashutil.bucket_cancel_secret_hash(fcs, peerid) + + def get_writekey(self): + return self._writekey + def get_readkey(self): + return self._readkey + def get_storage_index(self): + return self._storage_index + def get_privkey(self): + return self._privkey + def get_encprivkey(self): + return self._encprivkey + def get_pubkey(self): + return self._pubkey + + def get_required_shares(self): + return self._required_shares + def get_total_shares(self): + return self._total_shares + + #################################### + # IFilesystemNode + + def get_uri(self): + return self._uri.to_string() + def get_size(self): + return "?" # TODO: this is likely to cause problems, not being an int + def get_readonly(self): + if self.is_readonly(): + return self + ro = MutableFileNode(self._client) + ro.init_from_uri(self._uri.get_readonly()) + return ro + + def get_readonly_uri(self): + return self._uri.get_readonly().to_string() + + def is_mutable(self): + return self._uri.is_mutable() + def is_readonly(self): + return self._uri.is_readonly() + + def __hash__(self): + return hash((self.__class__, self._uri)) + def __cmp__(self, them): + if cmp(type(self), type(them)): + return cmp(type(self), type(them)) + if cmp(self.__class__, them.__class__): + return cmp(self.__class__, them.__class__) + return cmp(self._uri, them._uri) + + def get_verifier(self): + return IMutableFileURI(self._uri).get_verifier() + + def _do_serialized(self, cb, *args, **kwargs): + # note: to avoid deadlock, this callable is *not* allowed to invoke + # other serialized methods within this (or any other) + # MutableFileNode. The callable should be a bound method of this same + # MFN instance. + d = defer.Deferred() + self._serializer.addCallback(lambda ignore: cb(*args, **kwargs)) + # we need to put off d.callback until this Deferred is finished being + # processed. Otherwise the caller's subsequent activities (like, + # doing other things with this node) can cause reentrancy problems in + # the Deferred code itself + self._serializer.addBoth(lambda res: eventually(d.callback, res)) + # add a log.err just in case something really weird happens, because + # self._serializer stays around forever, therefore we won't see the + # usual Unhandled Error in Deferred that would give us a hint. + self._serializer.addErrback(log.err) + return d + + ################################# + # ICheckable + + def check(self, monitor, verify=False): + checker = self.checker_class(self, monitor) + return checker.check(verify) + + def check_and_repair(self, monitor, verify=False): + checker = self.check_and_repairer_class(self, monitor) + return checker.check(verify) + + ################################# + # IRepairable + + def repair(self, checker_results, force=False): + assert ICheckerResults(checker_results) + r = Repairer(self, checker_results) + d = r.start(force) + return d + + + ################################# + # IMutableFileNode + + # allow the use of IDownloadTarget + def download(self, target): + # fake it. TODO: make this cleaner. + d = self.download_best_version() + def _done(data): + target.open(len(data)) + target.write(data) + target.close() + return target.finish() + d.addCallback(_done) + return d + + + # new API + + def download_best_version(self): + return self._do_serialized(self._download_best_version) + def _download_best_version(self): + servermap = ServerMap() + d = self._try_once_to_download_best_version(servermap, MODE_READ) + def _maybe_retry(f): + f.trap(NotEnoughSharesError) + # the download is worth retrying once. Make sure to use the + # old servermap, since it is what remembers the bad shares, + # but use MODE_WRITE to make it look for even more shares. + # TODO: consider allowing this to retry multiple times.. this + # approach will let us tolerate about 8 bad shares, I think. + return self._try_once_to_download_best_version(servermap, + MODE_WRITE) + d.addErrback(_maybe_retry) + return d + def _try_once_to_download_best_version(self, servermap, mode): + d = self._update_servermap(servermap, mode) + d.addCallback(self._once_updated_download_best_version, servermap) + return d + def _once_updated_download_best_version(self, ignored, servermap): + goal = servermap.best_recoverable_version() + if not goal: + raise UnrecoverableFileError("no recoverable versions") + return self._try_once_to_download_version(servermap, goal) + + def get_size_of_best_version(self): + d = self.get_servermap(MODE_READ) + def _got_servermap(smap): + ver = smap.best_recoverable_version() + if not ver: + raise UnrecoverableFileError("no recoverable version") + return smap.size_of_version(ver) + d.addCallback(_got_servermap) + return d + + def overwrite(self, new_contents): + return self._do_serialized(self._overwrite, new_contents) + def _overwrite(self, new_contents): + servermap = ServerMap() + d = self._update_servermap(servermap, mode=MODE_WRITE) + d.addCallback(lambda ignored: self._upload(new_contents, servermap)) + return d + + + def modify(self, modifier, backoffer=None): + """I use a modifier callback to apply a change to the mutable file. + I implement the following pseudocode:: + + obtain_mutable_filenode_lock() + first_time = True + while True: + update_servermap(MODE_WRITE) + old = retrieve_best_version() + new = modifier(old, servermap, first_time) + first_time = False + if new == old: break + try: + publish(new) + except UncoordinatedWriteError, e: + backoffer(e) + continue + break + release_mutable_filenode_lock() + + The idea is that your modifier function can apply a delta of some + sort, and it will be re-run as necessary until it succeeds. The + modifier must inspect the old version to see whether its delta has + already been applied: if so it should return the contents unmodified. + + Note that the modifier is required to run synchronously, and must not + invoke any methods on this MutableFileNode instance. + + The backoff-er is a callable that is responsible for inserting a + random delay between subsequent attempts, to help competing updates + from colliding forever. It is also allowed to give up after a while. + The backoffer is given two arguments: this MutableFileNode, and the + Failure object that contains the UncoordinatedWriteError. It should + return a Deferred that will fire when the next attempt should be + made, or return the Failure if the loop should give up. If + backoffer=None, a default one is provided which will perform + exponential backoff, and give up after 4 tries. Note that the + backoffer should not invoke any methods on this MutableFileNode + instance, and it needs to be highly conscious of deadlock issues. + """ + return self._do_serialized(self._modify, modifier, backoffer) + def _modify(self, modifier, backoffer): + servermap = ServerMap() + if backoffer is None: + backoffer = BackoffAgent().delay + return self._modify_and_retry(servermap, modifier, backoffer, True) + def _modify_and_retry(self, servermap, modifier, backoffer, first_time): + d = self._modify_once(servermap, modifier, first_time) + def _retry(f): + f.trap(UncoordinatedWriteError) + d2 = defer.maybeDeferred(backoffer, self, f) + d2.addCallback(lambda ignored: + self._modify_and_retry(servermap, modifier, + backoffer, False)) + return d2 + d.addErrback(_retry) + return d + def _modify_once(self, servermap, modifier, first_time): + d = self._update_servermap(servermap, MODE_WRITE) + d.addCallback(self._once_updated_download_best_version, servermap) + def _apply(old_contents): + new_contents = modifier(old_contents, servermap, first_time) + if new_contents is None or new_contents == old_contents: + # no changes need to be made + if first_time: + return + # However, since Publish is not automatically doing a + # recovery when it observes UCWE, we need to do a second + # publish. See #551 for details. We'll basically loop until + # we managed an uncontested publish. + new_contents = old_contents + precondition(isinstance(new_contents, str), + "Modifier function must return a string or None") + return self._upload(new_contents, servermap) + d.addCallback(_apply) + return d + + def get_servermap(self, mode): + return self._do_serialized(self._get_servermap, mode) + def _get_servermap(self, mode): + servermap = ServerMap() + return self._update_servermap(servermap, mode) + def _update_servermap(self, servermap, mode): + u = ServermapUpdater(self, Monitor(), servermap, mode) + self._client.notify_mapupdate(u.get_status()) + return u.update() + + def download_version(self, servermap, version, fetch_privkey=False): + return self._do_serialized(self._try_once_to_download_version, + servermap, version, fetch_privkey) + def _try_once_to_download_version(self, servermap, version, + fetch_privkey=False): + r = Retrieve(self, servermap, version, fetch_privkey) + self._client.notify_retrieve(r.get_status()) + return r.download() + + def upload(self, new_contents, servermap): + return self._do_serialized(self._upload, new_contents, servermap) + def _upload(self, new_contents, servermap): + assert self._pubkey, "update_servermap must be called before publish" + p = Publish(self, servermap) + self._client.notify_publish(p.get_status(), len(new_contents)) + return p.publish(new_contents) + + + + +class MutableWatcher(service.MultiService): + MAX_MAPUPDATE_STATUSES = 20 + MAX_PUBLISH_STATUSES = 20 + MAX_RETRIEVE_STATUSES = 20 + name = "mutable-watcher" + + def __init__(self, stats_provider=None): + service.MultiService.__init__(self) + self.stats_provider = stats_provider + self._all_mapupdate_status = weakref.WeakKeyDictionary() + self._recent_mapupdate_status = [] + self._all_publish_status = weakref.WeakKeyDictionary() + self._recent_publish_status = [] + self._all_retrieve_status = weakref.WeakKeyDictionary() + self._recent_retrieve_status = [] + + + def notify_mapupdate(self, p): + self._all_mapupdate_status[p] = None + self._recent_mapupdate_status.append(p) + while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES: + self._recent_mapupdate_status.pop(0) + + def notify_publish(self, p, size): + self._all_publish_status[p] = None + self._recent_publish_status.append(p) + if self.stats_provider: + self.stats_provider.count('mutable.files_published', 1) + # We must be told bytes_published as an argument, since the + # publish_status does not yet know how much data it will be asked + # to send. When we move to MDMF we'll need to find a better way + # to handle this. + self.stats_provider.count('mutable.bytes_published', size) + while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES: + self._recent_publish_status.pop(0) + + def notify_retrieve(self, r): + self._all_retrieve_status[r] = None + self._recent_retrieve_status.append(r) + if self.stats_provider: + self.stats_provider.count('mutable.files_retrieved', 1) + self.stats_provider.count('mutable.bytes_retrieved', r.get_size()) + while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES: + self._recent_retrieve_status.pop(0) + + + def list_all_mapupdate_statuses(self): + return self._all_mapupdate_status.keys() + def list_all_publish_statuses(self): + return self._all_publish_status.keys() + def list_all_retrieve_statuses(self): + return self._all_retrieve_status.keys() diff --git a/src/allmydata/mutable/node.py b/src/allmydata/mutable/node.py deleted file mode 100644 index cfa823d8..00000000 --- a/src/allmydata/mutable/node.py +++ /dev/null @@ -1,484 +0,0 @@ - -import weakref, random -from twisted.application import service - -from zope.interface import implements -from twisted.internet import defer, reactor -from foolscap.eventual import eventually -from allmydata.interfaces import IMutableFileNode, IMutableFileURI, \ - ICheckable, ICheckerResults, NotEnoughSharesError -from allmydata.util import hashutil, log -from allmydata.util.assertutil import precondition -from allmydata.uri import WriteableSSKFileURI -from allmydata.monitor import Monitor -from pycryptopp.publickey import rsa -from pycryptopp.cipher.aes import AES - -from publish import Publish -from common import MODE_READ, MODE_WRITE, UnrecoverableFileError, \ - ResponseCache, UncoordinatedWriteError -from servermap import ServerMap, ServermapUpdater -from retrieve import Retrieve -from checker import MutableChecker, MutableCheckAndRepairer -from repair import Repairer - - -class BackoffAgent: - # these parameters are copied from foolscap.reconnector, which gets them - # from twisted.internet.protocol.ReconnectingClientFactory - initialDelay = 1.0 - factor = 2.7182818284590451 # (math.e) - jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole - maxRetries = 4 - - def __init__(self): - self._delay = self.initialDelay - self._count = 0 - def delay(self, node, f): - self._count += 1 - if self._count == 4: - return f - self._delay = self._delay * self.factor - self._delay = random.normalvariate(self._delay, - self._delay * self.jitter) - d = defer.Deferred() - reactor.callLater(self._delay, d.callback, None) - return d - -# use client.create_mutable_file() to make one of these - -class MutableFileNode: - implements(IMutableFileNode, ICheckable) - SIGNATURE_KEY_SIZE = 2048 - checker_class = MutableChecker - check_and_repairer_class = MutableCheckAndRepairer - - def __init__(self, client): - self._client = client - self._pubkey = None # filled in upon first read - self._privkey = None # filled in if we're mutable - # we keep track of the last encoding parameters that we use. These - # are updated upon retrieve, and used by publish. If we publish - # without ever reading (i.e. overwrite()), then we use these values. - defaults = client.get_encoding_parameters() - self._required_shares = defaults["k"] - self._total_shares = defaults["n"] - self._sharemap = {} # known shares, shnum-to-[nodeids] - self._cache = ResponseCache() - - # all users of this MutableFileNode go through the serializer. This - # takes advantage of the fact that Deferreds discard the callbacks - # that they're done with, so we can keep using the same Deferred - # forever without consuming more and more memory. - self._serializer = defer.succeed(None) - - def __repr__(self): - if hasattr(self, '_uri'): - return "<%s %x %s %s>" % (self.__class__.__name__, id(self), self.is_readonly() and 'RO' or 'RW', self._uri.abbrev()) - else: - return "<%s %x %s %s>" % (self.__class__.__name__, id(self), None, None) - - def init_from_uri(self, myuri): - # we have the URI, but we have not yet retrieved the public - # verification key, nor things like 'k' or 'N'. If and when someone - # wants to get our contents, we'll pull from shares and fill those - # in. - self._uri = IMutableFileURI(myuri) - if not self._uri.is_readonly(): - self._writekey = self._uri.writekey - self._readkey = self._uri.readkey - self._storage_index = self._uri.storage_index - self._fingerprint = self._uri.fingerprint - # the following values are learned during Retrieval - # self._pubkey - # self._required_shares - # self._total_shares - # and these are needed for Publish. They are filled in by Retrieval - # if possible, otherwise by the first peer that Publish talks to. - self._privkey = None - self._encprivkey = None - return self - - def create(self, initial_contents, keypair_generator=None): - """Call this when the filenode is first created. This will generate - the keys, generate the initial shares, wait until at least numpeers - are connected, allocate shares, and upload the initial - contents. Returns a Deferred that fires (with the MutableFileNode - instance you should use) when it completes. - """ - - d = defer.maybeDeferred(self._generate_pubprivkeys, keypair_generator) - d.addCallback(self._generated) - d.addCallback(lambda res: self._upload(initial_contents, None)) - return d - - def _generated(self, (pubkey, privkey) ): - self._pubkey, self._privkey = pubkey, privkey - pubkey_s = self._pubkey.serialize() - privkey_s = self._privkey.serialize() - self._writekey = hashutil.ssk_writekey_hash(privkey_s) - self._encprivkey = self._encrypt_privkey(self._writekey, privkey_s) - self._fingerprint = hashutil.ssk_pubkey_fingerprint_hash(pubkey_s) - self._uri = WriteableSSKFileURI(self._writekey, self._fingerprint) - self._readkey = self._uri.readkey - self._storage_index = self._uri.storage_index - - def _generate_pubprivkeys(self, keypair_generator): - if keypair_generator: - return keypair_generator(self.SIGNATURE_KEY_SIZE) - else: - # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs - signer = rsa.generate(self.SIGNATURE_KEY_SIZE) - verifier = signer.get_verifying_key() - return verifier, signer - - def _encrypt_privkey(self, writekey, privkey): - enc = AES(writekey) - crypttext = enc.process(privkey) - return crypttext - - def _decrypt_privkey(self, enc_privkey): - enc = AES(self._writekey) - privkey = enc.process(enc_privkey) - return privkey - - def _populate_pubkey(self, pubkey): - self._pubkey = pubkey - def _populate_required_shares(self, required_shares): - self._required_shares = required_shares - def _populate_total_shares(self, total_shares): - self._total_shares = total_shares - - def _populate_privkey(self, privkey): - self._privkey = privkey - def _populate_encprivkey(self, encprivkey): - self._encprivkey = encprivkey - - - def get_write_enabler(self, peerid): - assert len(peerid) == 20 - return hashutil.ssk_write_enabler_hash(self._writekey, peerid) - def get_renewal_secret(self, peerid): - assert len(peerid) == 20 - crs = self._client.get_renewal_secret() - frs = hashutil.file_renewal_secret_hash(crs, self._storage_index) - return hashutil.bucket_renewal_secret_hash(frs, peerid) - def get_cancel_secret(self, peerid): - assert len(peerid) == 20 - ccs = self._client.get_cancel_secret() - fcs = hashutil.file_cancel_secret_hash(ccs, self._storage_index) - return hashutil.bucket_cancel_secret_hash(fcs, peerid) - - def get_writekey(self): - return self._writekey - def get_readkey(self): - return self._readkey - def get_storage_index(self): - return self._storage_index - def get_privkey(self): - return self._privkey - def get_encprivkey(self): - return self._encprivkey - def get_pubkey(self): - return self._pubkey - - def get_required_shares(self): - return self._required_shares - def get_total_shares(self): - return self._total_shares - - #################################### - # IFilesystemNode - - def get_uri(self): - return self._uri.to_string() - def get_size(self): - return "?" # TODO: this is likely to cause problems, not being an int - def get_readonly(self): - if self.is_readonly(): - return self - ro = MutableFileNode(self._client) - ro.init_from_uri(self._uri.get_readonly()) - return ro - - def get_readonly_uri(self): - return self._uri.get_readonly().to_string() - - def is_mutable(self): - return self._uri.is_mutable() - def is_readonly(self): - return self._uri.is_readonly() - - def __hash__(self): - return hash((self.__class__, self._uri)) - def __cmp__(self, them): - if cmp(type(self), type(them)): - return cmp(type(self), type(them)) - if cmp(self.__class__, them.__class__): - return cmp(self.__class__, them.__class__) - return cmp(self._uri, them._uri) - - def get_verifier(self): - return IMutableFileURI(self._uri).get_verifier() - - def _do_serialized(self, cb, *args, **kwargs): - # note: to avoid deadlock, this callable is *not* allowed to invoke - # other serialized methods within this (or any other) - # MutableFileNode. The callable should be a bound method of this same - # MFN instance. - d = defer.Deferred() - self._serializer.addCallback(lambda ignore: cb(*args, **kwargs)) - # we need to put off d.callback until this Deferred is finished being - # processed. Otherwise the caller's subsequent activities (like, - # doing other things with this node) can cause reentrancy problems in - # the Deferred code itself - self._serializer.addBoth(lambda res: eventually(d.callback, res)) - # add a log.err just in case something really weird happens, because - # self._serializer stays around forever, therefore we won't see the - # usual Unhandled Error in Deferred that would give us a hint. - self._serializer.addErrback(log.err) - return d - - ################################# - # ICheckable - - def check(self, monitor, verify=False): - checker = self.checker_class(self, monitor) - return checker.check(verify) - - def check_and_repair(self, monitor, verify=False): - checker = self.check_and_repairer_class(self, monitor) - return checker.check(verify) - - ################################# - # IRepairable - - def repair(self, checker_results, force=False): - assert ICheckerResults(checker_results) - r = Repairer(self, checker_results) - d = r.start(force) - return d - - - ################################# - # IMutableFileNode - - # allow the use of IDownloadTarget - def download(self, target): - # fake it. TODO: make this cleaner. - d = self.download_best_version() - def _done(data): - target.open(len(data)) - target.write(data) - target.close() - return target.finish() - d.addCallback(_done) - return d - - - # new API - - def download_best_version(self): - return self._do_serialized(self._download_best_version) - def _download_best_version(self): - servermap = ServerMap() - d = self._try_once_to_download_best_version(servermap, MODE_READ) - def _maybe_retry(f): - f.trap(NotEnoughSharesError) - # the download is worth retrying once. Make sure to use the - # old servermap, since it is what remembers the bad shares, - # but use MODE_WRITE to make it look for even more shares. - # TODO: consider allowing this to retry multiple times.. this - # approach will let us tolerate about 8 bad shares, I think. - return self._try_once_to_download_best_version(servermap, - MODE_WRITE) - d.addErrback(_maybe_retry) - return d - def _try_once_to_download_best_version(self, servermap, mode): - d = self._update_servermap(servermap, mode) - d.addCallback(self._once_updated_download_best_version, servermap) - return d - def _once_updated_download_best_version(self, ignored, servermap): - goal = servermap.best_recoverable_version() - if not goal: - raise UnrecoverableFileError("no recoverable versions") - return self._try_once_to_download_version(servermap, goal) - - def get_size_of_best_version(self): - d = self.get_servermap(MODE_READ) - def _got_servermap(smap): - ver = smap.best_recoverable_version() - if not ver: - raise UnrecoverableFileError("no recoverable version") - return smap.size_of_version(ver) - d.addCallback(_got_servermap) - return d - - def overwrite(self, new_contents): - return self._do_serialized(self._overwrite, new_contents) - def _overwrite(self, new_contents): - servermap = ServerMap() - d = self._update_servermap(servermap, mode=MODE_WRITE) - d.addCallback(lambda ignored: self._upload(new_contents, servermap)) - return d - - - def modify(self, modifier, backoffer=None): - """I use a modifier callback to apply a change to the mutable file. - I implement the following pseudocode:: - - obtain_mutable_filenode_lock() - first_time = True - while True: - update_servermap(MODE_WRITE) - old = retrieve_best_version() - new = modifier(old, servermap, first_time) - first_time = False - if new == old: break - try: - publish(new) - except UncoordinatedWriteError, e: - backoffer(e) - continue - break - release_mutable_filenode_lock() - - The idea is that your modifier function can apply a delta of some - sort, and it will be re-run as necessary until it succeeds. The - modifier must inspect the old version to see whether its delta has - already been applied: if so it should return the contents unmodified. - - Note that the modifier is required to run synchronously, and must not - invoke any methods on this MutableFileNode instance. - - The backoff-er is a callable that is responsible for inserting a - random delay between subsequent attempts, to help competing updates - from colliding forever. It is also allowed to give up after a while. - The backoffer is given two arguments: this MutableFileNode, and the - Failure object that contains the UncoordinatedWriteError. It should - return a Deferred that will fire when the next attempt should be - made, or return the Failure if the loop should give up. If - backoffer=None, a default one is provided which will perform - exponential backoff, and give up after 4 tries. Note that the - backoffer should not invoke any methods on this MutableFileNode - instance, and it needs to be highly conscious of deadlock issues. - """ - return self._do_serialized(self._modify, modifier, backoffer) - def _modify(self, modifier, backoffer): - servermap = ServerMap() - if backoffer is None: - backoffer = BackoffAgent().delay - return self._modify_and_retry(servermap, modifier, backoffer, True) - def _modify_and_retry(self, servermap, modifier, backoffer, first_time): - d = self._modify_once(servermap, modifier, first_time) - def _retry(f): - f.trap(UncoordinatedWriteError) - d2 = defer.maybeDeferred(backoffer, self, f) - d2.addCallback(lambda ignored: - self._modify_and_retry(servermap, modifier, - backoffer, False)) - return d2 - d.addErrback(_retry) - return d - def _modify_once(self, servermap, modifier, first_time): - d = self._update_servermap(servermap, MODE_WRITE) - d.addCallback(self._once_updated_download_best_version, servermap) - def _apply(old_contents): - new_contents = modifier(old_contents, servermap, first_time) - if new_contents is None or new_contents == old_contents: - # no changes need to be made - if first_time: - return - # However, since Publish is not automatically doing a - # recovery when it observes UCWE, we need to do a second - # publish. See #551 for details. We'll basically loop until - # we managed an uncontested publish. - new_contents = old_contents - precondition(isinstance(new_contents, str), - "Modifier function must return a string or None") - return self._upload(new_contents, servermap) - d.addCallback(_apply) - return d - - def get_servermap(self, mode): - return self._do_serialized(self._get_servermap, mode) - def _get_servermap(self, mode): - servermap = ServerMap() - return self._update_servermap(servermap, mode) - def _update_servermap(self, servermap, mode): - u = ServermapUpdater(self, Monitor(), servermap, mode) - self._client.notify_mapupdate(u.get_status()) - return u.update() - - def download_version(self, servermap, version, fetch_privkey=False): - return self._do_serialized(self._try_once_to_download_version, - servermap, version, fetch_privkey) - def _try_once_to_download_version(self, servermap, version, - fetch_privkey=False): - r = Retrieve(self, servermap, version, fetch_privkey) - self._client.notify_retrieve(r.get_status()) - return r.download() - - def upload(self, new_contents, servermap): - return self._do_serialized(self._upload, new_contents, servermap) - def _upload(self, new_contents, servermap): - assert self._pubkey, "update_servermap must be called before publish" - p = Publish(self, servermap) - self._client.notify_publish(p.get_status(), len(new_contents)) - return p.publish(new_contents) - - - - -class MutableWatcher(service.MultiService): - MAX_MAPUPDATE_STATUSES = 20 - MAX_PUBLISH_STATUSES = 20 - MAX_RETRIEVE_STATUSES = 20 - name = "mutable-watcher" - - def __init__(self, stats_provider=None): - service.MultiService.__init__(self) - self.stats_provider = stats_provider - self._all_mapupdate_status = weakref.WeakKeyDictionary() - self._recent_mapupdate_status = [] - self._all_publish_status = weakref.WeakKeyDictionary() - self._recent_publish_status = [] - self._all_retrieve_status = weakref.WeakKeyDictionary() - self._recent_retrieve_status = [] - - - def notify_mapupdate(self, p): - self._all_mapupdate_status[p] = None - self._recent_mapupdate_status.append(p) - while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES: - self._recent_mapupdate_status.pop(0) - - def notify_publish(self, p, size): - self._all_publish_status[p] = None - self._recent_publish_status.append(p) - if self.stats_provider: - self.stats_provider.count('mutable.files_published', 1) - # We must be told bytes_published as an argument, since the - # publish_status does not yet know how much data it will be asked - # to send. When we move to MDMF we'll need to find a better way - # to handle this. - self.stats_provider.count('mutable.bytes_published', size) - while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES: - self._recent_publish_status.pop(0) - - def notify_retrieve(self, r): - self._all_retrieve_status[r] = None - self._recent_retrieve_status.append(r) - if self.stats_provider: - self.stats_provider.count('mutable.files_retrieved', 1) - self.stats_provider.count('mutable.bytes_retrieved', r.get_size()) - while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES: - self._recent_retrieve_status.pop(0) - - - def list_all_mapupdate_statuses(self): - return self._all_mapupdate_status.keys() - def list_all_publish_statuses(self): - return self._all_publish_status.keys() - def list_all_retrieve_statuses(self): - return self._all_retrieve_status.keys() diff --git a/src/allmydata/mutable/repair.py b/src/allmydata/mutable/repair.py deleted file mode 100644 index f3ae1ce6..00000000 --- a/src/allmydata/mutable/repair.py +++ /dev/null @@ -1,98 +0,0 @@ - -from zope.interface import implements -from allmydata.interfaces import IRepairResults, ICheckerResults - -class RepairResults: - implements(IRepairResults) - - def __init__(self, smap): - self.servermap = smap - - def to_string(self): - return "" - -class MustForceRepairError(Exception): - pass - -class Repairer: - def __init__(self, node, checker_results): - self.node = node - self.checker_results = ICheckerResults(checker_results) - assert checker_results.storage_index == self.node.get_storage_index() - - def start(self, force=False): - # download, then re-publish. If a server had a bad share, try to - # replace it with a good one of the same shnum. - - # The normal repair operation should not be used to replace - # application-specific merging of alternate versions: i.e if there - # are multiple highest seqnums with different roothashes. In this - # case, the application must use node.upload() (referencing the - # servermap that indicates the multiple-heads condition), or - # node.overwrite(). The repair() operation will refuse to run in - # these conditions unless a force=True argument is provided. If - # force=True is used, then the highest root hash will be reinforced. - - # Likewise, the presence of an unrecoverable latest version is an - # unusual event, and should ideally be handled by retrying a couple - # times (spaced out over hours or days) and hoping that new shares - # will become available. If repair(force=True) is called, data will - # be lost: a new seqnum will be generated with the same contents as - # the most recent recoverable version, skipping over the lost - # version. repair(force=False) will refuse to run in a situation like - # this. - - # Repair is designed to fix the following injuries: - # missing shares: add new ones to get at least N distinct ones - # old shares: replace old shares with the latest version - # bogus shares (bad sigs): replace the bad one with a good one - - smap = self.checker_results.get_servermap() - - if smap.unrecoverable_newer_versions(): - if not force: - raise MustForceRepairError("There were unrecoverable newer " - "versions, so force=True must be " - "passed to the repair() operation") - # continuing on means that node.upload() will pick a seqnum that - # is higher than everything visible in the servermap, effectively - # discarding the unrecoverable versions. - if smap.needs_merge(): - if not force: - raise MustForceRepairError("There were multiple recoverable " - "versions with identical seqnums, " - "so force=True must be passed to " - "the repair() operation") - # continuing on means that smap.best_recoverable_version() will - # pick the one with the highest roothash, and then node.upload() - # will replace all shares with its contents - - # missing shares are handled during upload, which tries to find a - # home for every share - - # old shares are handled during upload, which will replace any share - # that was present in the servermap - - # bogus shares need to be managed here. We might notice a bogus share - # during mapupdate (whether done for a filecheck or just before a - # download) by virtue of it having an invalid signature. We might - # also notice a bad hash in the share during verify or download. In - # either case, the problem will be noted in the servermap, and the - # bad share (along with its checkstring) will be recorded in - # servermap.bad_shares . Publish knows that it should try and replace - # these. - - # I chose to use the retrieve phase to ensure that the privkey is - # available, to avoid the extra roundtrip that would occur if we, - # say, added an smap.get_privkey() method. - - assert self.node.get_writekey() # repair currently requires a writecap - - best_version = smap.best_recoverable_version() - d = self.node.download_version(smap, best_version, fetch_privkey=True) - d.addCallback(self.node.upload, smap) - d.addCallback(self.get_results, smap) - return d - - def get_results(self, res, smap): - return RepairResults(smap) diff --git a/src/allmydata/mutable/repairer.py b/src/allmydata/mutable/repairer.py new file mode 100644 index 00000000..f3ae1ce6 --- /dev/null +++ b/src/allmydata/mutable/repairer.py @@ -0,0 +1,98 @@ + +from zope.interface import implements +from allmydata.interfaces import IRepairResults, ICheckerResults + +class RepairResults: + implements(IRepairResults) + + def __init__(self, smap): + self.servermap = smap + + def to_string(self): + return "" + +class MustForceRepairError(Exception): + pass + +class Repairer: + def __init__(self, node, checker_results): + self.node = node + self.checker_results = ICheckerResults(checker_results) + assert checker_results.storage_index == self.node.get_storage_index() + + def start(self, force=False): + # download, then re-publish. If a server had a bad share, try to + # replace it with a good one of the same shnum. + + # The normal repair operation should not be used to replace + # application-specific merging of alternate versions: i.e if there + # are multiple highest seqnums with different roothashes. In this + # case, the application must use node.upload() (referencing the + # servermap that indicates the multiple-heads condition), or + # node.overwrite(). The repair() operation will refuse to run in + # these conditions unless a force=True argument is provided. If + # force=True is used, then the highest root hash will be reinforced. + + # Likewise, the presence of an unrecoverable latest version is an + # unusual event, and should ideally be handled by retrying a couple + # times (spaced out over hours or days) and hoping that new shares + # will become available. If repair(force=True) is called, data will + # be lost: a new seqnum will be generated with the same contents as + # the most recent recoverable version, skipping over the lost + # version. repair(force=False) will refuse to run in a situation like + # this. + + # Repair is designed to fix the following injuries: + # missing shares: add new ones to get at least N distinct ones + # old shares: replace old shares with the latest version + # bogus shares (bad sigs): replace the bad one with a good one + + smap = self.checker_results.get_servermap() + + if smap.unrecoverable_newer_versions(): + if not force: + raise MustForceRepairError("There were unrecoverable newer " + "versions, so force=True must be " + "passed to the repair() operation") + # continuing on means that node.upload() will pick a seqnum that + # is higher than everything visible in the servermap, effectively + # discarding the unrecoverable versions. + if smap.needs_merge(): + if not force: + raise MustForceRepairError("There were multiple recoverable " + "versions with identical seqnums, " + "so force=True must be passed to " + "the repair() operation") + # continuing on means that smap.best_recoverable_version() will + # pick the one with the highest roothash, and then node.upload() + # will replace all shares with its contents + + # missing shares are handled during upload, which tries to find a + # home for every share + + # old shares are handled during upload, which will replace any share + # that was present in the servermap + + # bogus shares need to be managed here. We might notice a bogus share + # during mapupdate (whether done for a filecheck or just before a + # download) by virtue of it having an invalid signature. We might + # also notice a bad hash in the share during verify or download. In + # either case, the problem will be noted in the servermap, and the + # bad share (along with its checkstring) will be recorded in + # servermap.bad_shares . Publish knows that it should try and replace + # these. + + # I chose to use the retrieve phase to ensure that the privkey is + # available, to avoid the extra roundtrip that would occur if we, + # say, added an smap.get_privkey() method. + + assert self.node.get_writekey() # repair currently requires a writecap + + best_version = smap.best_recoverable_version() + d = self.node.download_version(smap, best_version, fetch_privkey=True) + d.addCallback(self.node.upload, smap) + d.addCallback(self.get_results, smap) + return d + + def get_results(self, res, smap): + return RepairResults(smap) diff --git a/src/allmydata/test/test_filenode.py b/src/allmydata/test/test_filenode.py index 025e76e6..46db7fcd 100644 --- a/src/allmydata/test/test_filenode.py +++ b/src/allmydata/test/test_filenode.py @@ -3,7 +3,7 @@ from twisted.trial import unittest from allmydata import uri from allmydata.monitor import Monitor from allmydata.immutable import filenode, download -from allmydata.mutable.node import MutableFileNode +from allmydata.mutable.filenode import MutableFileNode from allmydata.util import hashutil, cachedir from allmydata.test.common import download_to_data diff --git a/src/allmydata/test/test_mutable.py b/src/allmydata/test/test_mutable.py index 055b4649..d2cc36a6 100644 --- a/src/allmydata/test/test_mutable.py +++ b/src/allmydata/test/test_mutable.py @@ -18,7 +18,7 @@ from foolscap.eventual import eventually, fireEventually from foolscap.logging import log import sha -from allmydata.mutable.node import MutableFileNode, BackoffAgent +from allmydata.mutable.filenode import MutableFileNode, BackoffAgent from allmydata.mutable.common import DictOfSets, ResponseCache, \ MODE_CHECK, MODE_ANYTHING, MODE_WRITE, MODE_READ, \ NeedMoreDataError, UnrecoverableFileError, UncoordinatedWriteError, \ @@ -27,7 +27,7 @@ from allmydata.mutable.retrieve import Retrieve from allmydata.mutable.publish import Publish from allmydata.mutable.servermap import ServerMap, ServermapUpdater from allmydata.mutable.layout import unpack_header, unpack_share -from allmydata.mutable.repair import MustForceRepairError +from allmydata.mutable.repairer import MustForceRepairError import common_util as testutil