assert IMutableFileURI.providedBy(u), u
return MutableFileNode(self).init_from_uri(u)
- def notify_publish(self, p):
- self.getServiceNamed("mutable-watcher").notify_publish(p)
- def notify_retrieve(self, r):
- self.getServiceNamed("mutable-watcher").notify_retrieve(r)
+ def notify_publish(self, publish_status):
+ self.getServiceNamed("mutable-watcher").notify_publish(publish_status)
+ def notify_retrieve(self, retrieve_status):
+ self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
def create_empty_dirnode(self):
n = NewDirectoryNode(self)
class MutableFileNode:
implements(IMutableFileNode)
- publish_class = Publish
- retrieve_class = Retrieve
SIGNATURE_KEY_SIZE = 2048
DEFAULT_ENCODING = (3, 10)
# nobody knows about us yet"
self._current_seqnum = 0
self._current_roothash = "\x00"*32
- return self._publish(initial_contents)
+ return self._publish(None, initial_contents)
d.addCallback(_generated)
return d
def download_version(self, servermap, versionid):
"""Returns a Deferred that fires with a string."""
d = self.obtain_lock()
- d.addCallback(lambda res:
- Retrieve(self, servermap, versionid).download())
+ d.addCallback(lambda res: self._retrieve(servermap, versionid))
d.addBoth(self.release_lock)
return d
- def publish(self, servermap, newdata):
- assert self._pubkey, "update_servermap must be called before publish"
+ def publish(self, servermap, new_contents):
d = self.obtain_lock()
- d.addCallback(lambda res: Publish(self, servermap).publish(newdata))
- # p = self.publish_class(self)
- # self._client.notify_publish(p)
+ d.addCallback(lambda res: self._publish(servermap, new_contents))
d.addBoth(self.release_lock)
return d
verifier = self.get_verifier()
return self._client.getServiceNamed("checker").check(verifier)
- def download(self, target):
- # fake it. TODO: make this cleaner.
- d = self.download_to_data()
- def _done(data):
- target.open(len(data))
- target.write(data)
- target.close()
- return target.finish()
- d.addCallback(_done)
- return d
+
+ def _retrieve(self, servermap, verinfo):
+ r = Retrieve(self, servermap, verinfo)
+ self._client.notify_retrieve(r.get_status())
+ return r.download()
def _update_and_retrieve_best(self, old_map=None, mode=MODE_READ):
d = self.update_servermap(old_map=old_map, mode=mode)
d.addBoth(self.release_lock)
return d
- def _publish(self, initial_contents):
- p = Publish(self, None)
- d = p.publish(initial_contents)
- d.addCallback(lambda res: self)
+ def download(self, target):
+ # fake it. TODO: make this cleaner.
+ d = self.download_to_data()
+ def _done(data):
+ target.open(len(data))
+ target.write(data)
+ target.close()
+ return target.finish()
+ d.addCallback(_done)
return d
- def update(self, newdata):
+
+ def _publish(self, servermap, new_contents):
+ assert self._pubkey, "update_servermap must be called before publish"
+ p = Publish(self, servermap)
+ self._client.notify_publish(p.get_status())
+ return p.publish(new_contents)
+
+ def update(self, new_contents):
d = self.obtain_lock()
d.addCallback(lambda res: self.update_servermap(mode=MODE_WRITE))
- d.addCallback(lambda smap:
- Publish(self, smap).publish(newdata))
+ d.addCallback(self._publish, new_contents)
d.addBoth(self.release_lock)
return d
- def overwrite(self, newdata):
- return self.update(newdata)
+ def overwrite(self, new_contents):
+ return self.update(new_contents)
class MutableWatcher(service.MultiService):
def __init__(self, stats_provider=None):
service.MultiService.__init__(self)
self.stats_provider = stats_provider
- self._all_publish = weakref.WeakKeyDictionary()
+ self._all_publish_status = weakref.WeakKeyDictionary()
self._recent_publish_status = []
- self._all_retrieve = weakref.WeakKeyDictionary()
+ self._all_retrieve_status = weakref.WeakKeyDictionary()
self._recent_retrieve_status = []
def notify_publish(self, p):
- self._all_publish[p] = None
- self._recent_publish_status.append(p.get_status())
+ self._all_publish_status[p] = None
+ self._recent_publish_status.append(p)
if self.stats_provider:
self.stats_provider.count('mutable.files_published', 1)
- #self.stats_provider.count('mutable.bytes_published', p._node.get_size())
+ self.stats_provider.count('mutable.bytes_published', p.get_size())
while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
self._recent_publish_status.pop(0)
def list_all_publish(self):
- return self._all_publish.keys()
+ return self._all_publish_status.keys()
def list_active_publish(self):
- return [p.get_status() for p in self._all_publish.keys()
- if p.get_status().get_active()]
+ return [p for p in self._all_publish_status.keys() if p.get_active()]
def list_recent_publish(self):
return self._recent_publish_status
def notify_retrieve(self, r):
- self._all_retrieve[r] = None
- self._recent_retrieve_status.append(r.get_status())
+ self._all_retrieve_status[r] = None
+ self._recent_retrieve_status.append(r)
if self.stats_provider:
self.stats_provider.count('mutable.files_retrieved', 1)
- #self.stats_provider.count('mutable.bytes_retrieved', r._node.get_size())
+ self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
self._recent_retrieve_status.pop(0)
def list_all_retrieve(self):
- return self._all_retrieve.keys()
+ return self._all_retrieve_status.keys()
def list_active_retrieve(self):
- return [p.get_status() for p in self._all_retrieve.keys()
- if p.get_status().get_active()]
+ return [p for p in self._all_retrieve_status.keys() if p.get_active()]
def list_recent_retrieve(self):
return self._recent_retrieve_status
from itertools import count
from zope.interface import implements
from twisted.internet import defer
+from twisted.python import failure
from allmydata.interfaces import IPublishStatus
from allmydata.util import base32, hashutil, mathutil, idlib, log
from allmydata import hashtree, codec, storage
from pycryptopp.cipher.aes import AES
+from foolscap.eventual import eventually
from common import MODE_WRITE, UncoordinatedWriteError, DictOfSets
from servermap import ServerMap
statusid_counter = count(0)
def __init__(self):
self.timings = {}
- self.timings["per_server"] = {}
- self.privkey_from = None
- self.peers_queried = None
- self.sharemap = None # DictOfSets
+ self.timings["send_per_server"] = {}
+ self.servermap = None
self.problems = {}
self.active = True
self.storage_index = None
self.helper = False
self.encoding = ("?", "?")
- self.initial_read_size = None
self.size = None
self.status = "Not started"
self.progress = 0.0
self.counter = self.statusid_counter.next()
self.started = time.time()
- def add_per_server_time(self, peerid, op, elapsed):
- assert op in ("read", "write")
- if peerid not in self.timings["per_server"]:
- self.timings["per_server"][peerid] = []
- self.timings["per_server"][peerid].append((op,elapsed))
+ def add_per_server_time(self, peerid, elapsed):
+ if peerid not in self.timings["send_per_server"]:
+ self.timings["send_per_server"][peerid] = []
+ self.timings["send_per_server"][peerid].append(elapsed)
def get_started(self):
return self.started
return self.encoding
def using_helper(self):
return self.helper
+ def get_servermap(self):
+ return self.servermap
def get_size(self):
return self.size
def get_status(self):
self.storage_index = si
def set_helper(self, helper):
self.helper = helper
+ def set_servermap(self, servermap):
+ self.servermap = servermap
def set_encoding(self, k, n):
self.encoding = (k, n)
def set_size(self, size):
self._log_number = num
self._running = True
+ self._status = PublishStatus()
+ self._status.set_storage_index(self._storage_index)
+ self._status.set_helper(False)
+ self._status.set_progress(0.0)
+ self._status.set_active(True)
+ self._status.set_servermap(servermap)
+
def log(self, *args, **kwargs):
if 'parent' not in kwargs:
kwargs['parent'] = self._log_number
# 5: when enough responses are back, we're done
self.log("starting publish, datalen is %s" % len(newdata))
+ self._status.set_size(len(newdata))
+ self._status.set_status("Started")
+ self._started = time.time()
self.done_deferred = defer.Deferred()
assert self.required_shares is not None
self.total_shares = self._node.get_total_shares()
assert self.total_shares is not None
+ self._status.set_encoding(self.required_shares, self.total_shares)
+
self._pubkey = self._node.get_pubkey()
assert self._pubkey
self._privkey = self._node.get_privkey()
# create the shares. We'll discard these as they are delivered. SMDF:
# we're allowed to hold everything in memory.
+ self._status.timings["setup"] = time.time() - self._started
d = self._encrypt_and_encode()
d.addCallback(self._generate_shares)
+ def _start_pushing(res):
+ self._started_pushing = time.time()
+ return res
+ d.addCallback(_start_pushing)
d.addCallback(self.loop) # trigger delivery
d.addErrback(self._fatal_error)
self.log("error during loop", failure=f, level=log.SCARY)
self._done(f)
+ def _update_status(self):
+ self._status.set_status("Sending Shares: %d placed out of %d, "
+ "%d messages outstanding" %
+ (len(self.placed),
+ len(self.goal),
+ len(self.outstanding)))
+ self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
+
def loop(self, ignored=None):
self.log("entering loop", level=log.NOISY)
+ if not self._running:
+ return
self.update_goal()
# how far are we from our goal?
needed = self.goal - self.placed - self.outstanding
+ self._update_status()
if needed:
# we need to send out new shares
# no queries outstanding, no placements needed: we're done
self.log("no queries outstanding, no placements needed: done",
level=log.OPERATIONAL)
+ now = time.time()
+ elapsed = now - self._started_pushing
+ self._status.timings["push"] = elapsed
return self._done(None)
def log_goal(self, goal):
# shares that we care about.
self.log("_encrypt_and_encode")
- #started = time.time()
+ self._status.set_status("Encrypting")
+ started = time.time()
key = hashutil.ssk_readkey_data_hash(self.salt, self.readkey)
enc = AES(key)
crypttext = enc.process(self.newdata)
assert len(crypttext) == len(self.newdata)
- #now = time.time()
- #self._status.timings["encrypt"] = now - started
- #started = now
+ now = time.time()
+ self._status.timings["encrypt"] = now - started
+ started = now
# now apply FEC
+ self._status.set_status("Encoding")
fec = codec.CRSEncoder()
fec.set_params(self.segment_size,
self.required_shares, self.total_shares)
d = fec.encode(crypttext_pieces)
def _done_encoding(res):
- #elapsed = time.time() - started
- #self._status.timings["encode"] = elapsed
+ elapsed = time.time() - started
+ self._status.timings["encode"] = elapsed
return res
d.addCallback(_done_encoding)
return d
def _generate_shares(self, shares_and_shareids):
# this sets self.shares and self.root_hash
self.log("_generate_shares")
- #started = time.time()
+ self._status.set_status("Generating Shares")
+ started = time.time()
# we should know these by now
privkey = self._privkey
# then they all share the same encprivkey at the end. The sizes
# of everything are the same for all shares.
- #sign_started = time.time()
+ sign_started = time.time()
signature = privkey.sign(prefix)
- #self._status.timings["sign"] = time.time() - sign_started
+ self._status.timings["sign"] = time.time() - sign_started
verification_key = pubkey.serialize()
all_shares[shnum],
encprivkey)
final_shares[shnum] = final_share
- #elapsed = time.time() - started
- #self._status.timings["pack"] = elapsed
+ elapsed = time.time() - started
+ self._status.timings["pack"] = elapsed
self.shares = final_shares
self.root_hash = root_hash
def _send_shares(self, needed):
self.log("_send_shares")
- #started = time.time()
# we're finally ready to send out our shares. If we encounter any
# surprises here, it's because somebody else is writing at the same
d.addErrback(self._fatal_error)
dl.append(d)
+ self._update_status()
return defer.DeferredList(dl) # purely for testing
def _do_testreadwrite(self, peerid, secrets,
for shnum in shnums:
self.outstanding.discard( (peerid, shnum) )
+ now = time.time()
+ elapsed = now - started
+ self._status.add_per_server_time(peerid, elapsed)
+
wrote, read_data = answer
if not wrote:
if not self._running:
return
self._running = False
- #now = time.time()
- #self._status.timings["total"] = now - self._started
- #self._status.set_active(False)
- #self._status.set_status("Done")
- #self._status.set_progress(1.0)
- self.done_deferred.callback(res)
- return None
+ now = time.time()
+ self._status.timings["total"] = now - self._started
+ self._status.set_active(False)
+ if isinstance(res, failure.Failure):
+ self.log("Retrieve done, with failure", failure=res)
+ self._status.set_status("Failed")
+ else:
+ self._status.set_status("Done")
+ self._status.set_progress(1.0)
+ eventually(self.done_deferred.callback, res)
def get_status(self):
return self._status
self.timings = {}
self.timings["fetch_per_server"] = {}
self.timings["cumulative_verify"] = 0.0
- self.sharemap = {}
self.problems = {}
self.active = True
self.storage_index = None
self.helper = False
self.encoding = ("?","?")
- self.search_distance = None
self.size = None
self.status = "Not started"
self.progress = 0.0
return self.storage_index
def get_encoding(self):
return self.encoding
- def get_search_distance(self):
- return self.search_distance
def using_helper(self):
return self.helper
def get_size(self):
def get_counter(self):
return self.counter
+ def add_fetch_timing(self, peerid, elapsed):
+ if peerid not in self.timings["fetch_per_server"]:
+ self.timings["fetch_per_server"][peerid] = []
+ self.timings["fetch_per_server"][peerid].append(elapsed)
def set_storage_index(self, si):
self.storage_index = si
def set_helper(self, helper):
self.helper = helper
def set_encoding(self, k, n):
self.encoding = (k, n)
- def set_search_distance(self, value):
- self.search_distance = value
def set_size(self, size):
self.size = size
def set_status(self, status):
assert self._node._pubkey
self.verinfo = verinfo
+ self._status = RetrieveStatus()
+ self._status.set_storage_index(self._storage_index)
+ self._status.set_helper(False)
+ self._status.set_progress(0.0)
+ self._status.set_active(True)
+ (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
+ offsets_tuple) = self.verinfo
+ self._status.set_size(datalength)
+ self._status.set_encoding(k, N)
+
+ def get_status(self):
+ return self._status
+
def log(self, *args, **kwargs):
if "parent" not in kwargs:
kwargs["parent"] = self._log_number
def download(self):
self._done_deferred = defer.Deferred()
+ self._started = time.time()
+ self._status.set_status("Retrieving Shares")
# first, which servers can we use?
versionmap = self.servermap.make_versionmap()
self._outstanding_queries[m] = (peerid, shnum, started)
# ask the cache first
+ got_from_cache = False
datav = []
#for (offset, length) in readv:
# (data, timestamp) = self._node._cache.read(self.verinfo, shnum,
# datav.append(data)
if len(datav) == len(readv):
self.log("got data from cache")
+ got_from_cache = True
d = defer.succeed(datav)
else:
self.remaining_sharemap[shnum].remove(peerid)
d = self._do_read(ss, peerid, self._storage_index, [shnum], readv)
d.addCallback(self._fill_cache, readv)
- d.addCallback(self._got_results, m, peerid, started)
+ d.addCallback(self._got_results, m, peerid, started, got_from_cache)
d.addErrback(self._query_failed, m, peerid)
# errors that aren't handled by _query_failed (and errors caused by
# _query_failed) get logged, but we still want to check for doneness.
for shnum in list(self.remaining_sharemap.keys()):
self.remaining_sharemap.discard(shnum, peerid)
- def _got_results(self, datavs, marker, peerid, started):
+ def _got_results(self, datavs, marker, peerid, started, got_from_cache):
+ now = time.time()
+ elapsed = now - started
+ if not got_from_cache:
+ self._status.add_fetch_timing(peerid, elapsed)
self.log(format="got results (%(shares)d shares) from [%(peerid)s]",
shares=len(datavs),
peerid=idlib.shortnodeid_b2a(peerid),
self.remove_peer(peerid)
self.servermap.mark_bad_share(peerid, shnum)
self._bad_shares.add( (peerid, shnum) )
+ self._status.problems[peerid] = f
self._last_failure = f
pass
# all done!
self.log(format="query to [%(peerid)s] failed",
peerid=idlib.shortnodeid_b2a(peerid),
level=log.NOISY)
+ self._status.problems[peerid] = f
self._outstanding_queries.pop(marker, None)
if not self._running:
return
# to fix it, so the download will fail.
self._decoding = True # avoid reentrancy
+ self._status.set_status("decoding")
+ now = time.time()
+ elapsed = now - self._started
+ self._status.timings["fetch"] = elapsed
d = defer.maybeDeferred(self._decode)
d.addCallback(self._decrypt, IV, self._node._readkey)
peerid = list(self.remaining_sharemap[shnum])[0]
# get_data will remove that peerid from the sharemap, and add the
# query to self._outstanding_queries
+ self._status.set_status("Retrieving More Shares")
self.get_data(shnum, peerid)
needed -= 1
if not needed:
return
def _decode(self):
+ started = time.time()
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
self.log("about to decode, shareids=%s" % (shareids,))
d = defer.maybeDeferred(fec.decode, shares, shareids)
def _done(buffers):
+ self._status.timings["decode"] = time.time() - started
self.log(" decode done, %d buffers" % len(buffers))
segment = "".join(buffers)
self.log(" joined length %d, datalength %d" %
return d
def _decrypt(self, crypttext, IV, readkey):
+ self._status.set_status("decrypting")
started = time.time()
key = hashutil.ssk_readkey_data_hash(IV, readkey)
decryptor = AES(key)
plaintext = decryptor.process(crypttext)
+ self._status.timings["decrypt"] = time.time() - started
return plaintext
def _done(self, res):
if not self._running:
return
self._running = False
+ self._status.set_active(False)
+ self._status.timings["total"] = time.time() - self._started
# res is either the new contents, or a Failure
if isinstance(res, failure.Failure):
self.log("Retrieve done, with failure", failure=res)
+ self._status.set_status("Failed")
else:
self.log("Retrieve done, success!")
+ self._status.set_status("Done")
+ self._status.set_progress(1.0)
# remember the encoding parameters, use them again next time
(seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
offsets_tuple) = self.verinfo
for (peerid, shnum)
in self.servermap])
+ def make_sharemap(self):
+ """Return a dict that maps shnum to a set of peerds that hold it."""
+ sharemap = DictOfSets()
+ for (peerid, shnum) in self.servermap:
+ sharemap.add(shnum, peerid)
+ return sharemap
+
def make_versionmap(self):
"""Return a dict that maps versionid to sets of (shnum, peerid,
timestamp) tuples."""
t.setServiceParent(self.parent)
t.listenOn("tcp:0")
t.setLocationAutomatically()
+ return eventual.fireEventually()
def tearDown(self):
d = self.parent.stopService()
<h2>Retrieve Results</h2>
<ul>
<li n:render="encoding" />
- <li n:render="peers_queried" />
<li n:render="problems" />
<li n:render="sharemap" />
<li>Timings:</li>
(<span n:render="rate" n:data="rate_total" />)</li>
<ul>
<li>Setup: <span n:render="time" n:data="time_setup" /></li>
- <li>Initial Version Query: <span n:render="time" n:data="time_query" />
- (read size <span n:render="string" n:data="initial_read_size"/> bytes)</li>
- <li>Obtain Privkey: <span n:render="time" n:data="time_privkey" />
- <ul>
- <li>Separate Privkey Fetch: <span n:render="time" n:data="time_privkey_fetch" /> <span n:render="privkey_from"/></li>
- </ul></li>
<li>Encrypting: <span n:render="time" n:data="time_encrypt" />
(<span n:render="rate" n:data="rate_encrypt" />)</li>
<li>Encoding: <span n:render="time" n:data="time_encode" />
<h2>Retrieve Results</h2>
<ul>
<li n:render="encoding" />
- <li n:render="search_distance" />
<li n:render="problems" />
<li>Timings:</li>
<ul>
<li>Total: <span n:render="time" n:data="time_total" />
(<span n:render="rate" n:data="rate_total" />)</li>
<ul>
- <li>Initial Peer Selection: <span n:render="time" n:data="time_peer_selection" /></li>
<li>Fetching: <span n:render="time" n:data="time_fetch" />
- (<span n:render="rate" n:data="rate_fetch" />)
- <ul>
- <li>Cumulative Verify: <span n:render="time" n:data="time_cumulative_verify" /></li>
- </ul></li>
+ (<span n:render="rate" n:data="rate_fetch" />)</li>
<li>Decoding: <span n:render="time" n:data="time_decode" />
(<span n:render="rate" n:data="rate_decode" />)</li>
<li>Decrypting: <span n:render="time" n:data="time_decrypt" />
k, n = data.get_encoding()
return ctx.tag["Encoding: %s of %s" % (k, n)]
- def render_search_distance(self, ctx, data):
- d = data.get_search_distance()
- return ctx.tag["Search Distance: %s peer%s" % (d, plural(d))]
-
def render_problems(self, ctx, data):
problems = data.problems
if not problems:
k, n = data.get_encoding()
return ctx.tag["Encoding: %s of %s" % (k, n)]
- def render_peers_queried(self, ctx, data):
- return ctx.tag["Peers Queried: ", data.peers_queried]
-
def render_sharemap(self, ctx, data):
- sharemap = data.sharemap
- if sharemap is None:
+ servermap = data.get_servermap()
+ if servermap is None:
return ctx.tag["None"]
l = T.ul()
+ sharemap = servermap.make_sharemap()
for shnum in sorted(sharemap.keys()):
l[T.li["%d -> Placed on " % shnum,
", ".join(["[%s]" % idlib.shortnodeid_b2a(peerid)
- for (peerid,seqnum,root_hash)
- in sharemap[shnum]])]]
+ for peerid in sharemap[shnum]])]]
return ctx.tag["Sharemap:", l]
def render_problems(self, ctx, data):
def data_time_setup(self, ctx, data):
return self.publish_status.timings.get("setup")
- def data_time_query(self, ctx, data):
- return self.publish_status.timings.get("query")
-
- def data_time_privkey(self, ctx, data):
- return self.publish_status.timings.get("privkey")
-
- def data_time_privkey_fetch(self, ctx, data):
- return self.publish_status.timings.get("privkey_fetch")
- def render_privkey_from(self, ctx, data):
- peerid = data.privkey_from
- if peerid:
- return " (got from [%s])" % idlib.shortnodeid_b2a(peerid)
- else:
- return ""
-
def data_time_encrypt(self, ctx, data):
return self.publish_status.timings.get("encrypt")
def data_rate_encrypt(self, ctx, data):
def data_rate_push(self, ctx, data):
return self._get_rate(data, "push")
- def data_initial_read_size(self, ctx, data):
- return self.publish_status.initial_read_size
-
def render_server_timings(self, ctx, data):
- per_server = self.publish_status.timings.get("per_server")
+ per_server = self.publish_status.timings.get("send_per_server")
if not per_server:
return ""
l = T.ul()
for peerid in sorted(per_server.keys()):
peerid_s = idlib.shortnodeid_b2a(peerid)
- times = []
- for op,t in per_server[peerid]:
- if op == "read":
- times.append( "(" + self.render_time(None, t) + ")" )
- else:
- times.append( self.render_time(None, t) )
- times_s = ", ".join(times)
+ times_s = ", ".join([self.render_time(None, t)
+ for t in per_server[peerid]])
l[T.li["[%s]: %s" % (peerid_s, times_s)]]
return T.li["Per-Server Response Times: ", l]