import os, stat, time, weakref
from zope.interface import implements
-from twisted.application import service
from twisted.internet import defer
-from foolscap import Referenceable, DeadReferenceError
-from foolscap.eventual import eventually
+from foolscap.api import Referenceable, DeadReferenceError, eventually
import allmydata # for __full_version__
-from allmydata import interfaces, storage, uri
+from allmydata import interfaces, uri
+from allmydata.storage.server import si_b2a
from allmydata.immutable import upload
from allmydata.immutable.layout import ReadBucketProxy
from allmydata.util.assertutil import precondition
-from allmydata.util import idlib, log, observer, fileutil, hashutil, dictutil
+from allmydata.util import log, observer, fileutil, hashutil, dictutil
class NotEnoughWritersError(Exception):
def _get_all_shareholders(self, storage_index):
dl = []
- for (peerid, ss) in self._peer_getter("storage", storage_index):
- d = ss.callRemote("get_buckets", storage_index)
+ for s in self._peer_getter(storage_index):
+ d = s.get_rref().callRemote("get_buckets", storage_index)
d.addCallbacks(self._got_response, self._got_error,
- callbackArgs=(peerid,))
+ callbackArgs=(s,))
dl.append(d)
return defer.DeferredList(dl)
- def _got_response(self, buckets, peerid):
+ def _got_response(self, buckets, server):
# buckets is a dict: maps shum to an rref of the server who holds it
shnums_s = ",".join([str(shnum) for shnum in buckets])
self.log("got_response: [%s] has %d shares (%s)" %
- (idlib.shortnodeid_b2a(peerid), len(buckets), shnums_s),
+ (server.get_name(), len(buckets), shnums_s),
level=log.NOISY)
self._found_shares.update(buckets.keys())
for k in buckets:
- self._sharemap.add(k, peerid)
- self._readers.update( [ (bucket, peerid)
+ self._sharemap.add(k, server.get_serverid())
+ self._readers.update( [ (bucket, server)
for bucket in buckets.values() ] )
def _got_error(self, f):
if not self._readers:
self.log("no readers, so no UEB", level=log.NOISY)
return
- b,peerid = self._readers.pop()
- rbp = ReadBucketProxy(b, peerid, storage.si_b2a(self._storage_index))
+ b,server = self._readers.pop()
+ rbp = ReadBucketProxy(b, server, si_b2a(self._storage_index))
d = rbp.get_uri_extension()
d.addCallback(self._got_uri_extension)
d.addErrback(self._ueb_error)
"application-version": str(allmydata.__full_version__),
}
- def __init__(self, storage_index, helper,
+ def __init__(self, storage_index,
+ helper, storage_broker, secret_holder,
incoming_file, encoding_file,
- results, log_number):
+ log_number):
self._storage_index = storage_index
self._helper = helper
self._incoming_file = incoming_file
self._encoding_file = encoding_file
- self._upload_id = storage.si_b2a(storage_index)[:5]
+ self._upload_id = si_b2a(storage_index)[:5]
self._log_number = log_number
- self._results = results
self._upload_status = upload.UploadStatus()
self._upload_status.set_helper(False)
self._upload_status.set_storage_index(storage_index)
self._helper.log("CHKUploadHelper starting for SI %s" % self._upload_id,
parent=log_number)
- self._client = helper.parent
+ self._storage_broker = storage_broker
+ self._secret_holder = secret_holder
self._fetcher = CHKCiphertextFetcher(self, incoming_file, encoding_file,
self._log_number)
self._reader = LocalCiphertextReader(self, storage_index, encoding_file)
self._finished_observers = observer.OneShotObserverList()
+ self._started = time.time()
d = self._fetcher.when_done()
d.addCallback(lambda res: self._reader.start())
d.addCallback(lambda res: self.start_encrypted(self._reader))
kwargs['facility'] = "tahoe.helper.chk"
return upload.CHKUploader.log(self, *args, **kwargs)
- def start(self):
- self._started = time.time()
- # determine if we need to upload the file. If so, return ({},self) .
- # If not, return (UploadResults,None) .
+ def remote_get_version(self):
+ return self.VERSION
+
+ def remote_upload(self, reader):
+ # reader is an RIEncryptedUploadable. I am specified to return an
+ # UploadResults dictionary.
+
+ # Log how much ciphertext we need to get.
self.log("deciding whether to upload the file or not", level=log.NOISY)
if os.path.exists(self._encoding_file):
# we have the whole file, and we might be encoding it (or the
# encode/upload might have failed, and we need to restart it).
self.log("ciphertext already in place", level=log.UNUSUAL)
- return (self._results, self)
- if os.path.exists(self._incoming_file):
+ elif os.path.exists(self._incoming_file):
# we have some of the file, but not all of it (otherwise we'd be
# encoding). The caller might be useful.
self.log("partial ciphertext already present", level=log.UNUSUAL)
- return (self._results, self)
- # we don't remember uploading this file
- self.log("no ciphertext yet", level=log.NOISY)
- return (self._results, self)
-
- def remote_get_version(self):
- return self.VERSION
-
- def remote_upload(self, reader):
- # reader is an RIEncryptedUploadable. I am specified to return an
- # UploadResults dictionary.
+ else:
+ # we don't remember uploading this file
+ self.log("no ciphertext yet", level=log.NOISY)
# let our fetcher pull ciphertext from the reader.
self._fetcher.add_reader(reader)
# and inform the client when the upload has finished
return self._finished_observers.when_fired()
- def _finished(self, uploadresults):
- precondition(isinstance(uploadresults.verifycapstr, str), uploadresults.verifycapstr)
- assert interfaces.IUploadResults.providedBy(uploadresults), uploadresults
- r = uploadresults
- v = uri.from_string(r.verifycapstr)
- r.uri_extension_hash = v.uri_extension_hash
+ def _finished(self, ur):
+ assert interfaces.IUploadResults.providedBy(ur), ur
+ vcapstr = ur.get_verifycapstr()
+ precondition(isinstance(vcapstr, str), vcapstr)
+ v = uri.from_string(vcapstr)
f_times = self._fetcher.get_times()
- r.timings["cumulative_fetch"] = f_times["cumulative_fetch"]
- r.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
- r.timings["total_fetch"] = f_times["total"]
+
+ hur = upload.HelperUploadResults()
+ hur.timings = {"cumulative_fetch": f_times["cumulative_fetch"],
+ "total_fetch": f_times["total"],
+ }
+ for key,val in ur.get_timings().items():
+ hur.timings[key] = val
+ hur.uri_extension_hash = v.uri_extension_hash
+ hur.ciphertext_fetched = self._fetcher.get_ciphertext_fetched()
+ hur.preexisting_shares = ur.get_preexisting_shares()
+ # hur.sharemap needs to be {shnum: set(serverid)}
+ hur.sharemap = {}
+ for shnum, servers in ur.get_sharemap().items():
+ hur.sharemap[shnum] = set([s.get_serverid() for s in servers])
+ # and hur.servermap needs to be {serverid: set(shnum)}
+ hur.servermap = {}
+ for server, shnums in ur.get_servermap().items():
+ hur.servermap[server.get_serverid()] = set(shnums)
+ hur.pushed_shares = ur.get_pushed_shares()
+ hur.file_size = ur.get_file_size()
+ hur.uri_extension_data = ur.get_uri_extension_data()
+ hur.verifycapstr = vcapstr
+
self._reader.close()
os.unlink(self._encoding_file)
- self._finished_observers.fire(r)
+ self._finished_observers.fire(hur)
self._helper.upload_finished(self._storage_index, v.size)
del self._reader
def _failed(self, f):
self.log(format="CHKUploadHelper(%(si)s) failed",
- si=storage.si_b2a(self._storage_index)[:5],
+ si=si_b2a(self._storage_index)[:5],
failure=f,
level=log.UNUSUAL)
self._finished_observers.fire(f)
if os.path.exists(self._encoding_file):
self.log("ciphertext already present, bypassing fetch",
level=log.UNUSUAL)
+ # XXX the following comment is probably stale, since
+ # LocalCiphertextReader.get_plaintext_hashtree_leaves does not exist.
+ #
# we'll still need the plaintext hashes (when
# LocalCiphertextReader.get_plaintext_hashtree_leaves() is
# called), and currently the easiest way to get them is to ask
# else.
have = os.stat(self._encoding_file)[stat.ST_SIZE]
d = self.call("read_encrypted", have-1, 1)
- d.addCallback(self._done2, started)
- return
-
- # first, find out how large the file is going to be
- d = self.call("get_size")
- d.addCallback(self._got_size)
- d.addCallback(self._start_reading)
- d.addCallback(self._done)
+ else:
+ # first, find out how large the file is going to be
+ d = self.call("get_size")
+ d.addCallback(self._got_size)
+ d.addCallback(self._start_reading)
+ d.addCallback(self._done)
d.addCallback(self._done2, started)
d.addErrback(self._failed)
d = defer.maybeDeferred(self.f.read, length)
d.addCallback(lambda data: [data])
return d
- def get_plaintext_hashtree_leaves(self, first, last, num_segments):
- return self.call("get_plaintext_hashtree_leaves", first, last,
- num_segments)
- def get_plaintext_hash(self):
- return self.call("get_plaintext_hash")
+
def close(self):
self.f.close()
# ??. I'm not sure if it makes sense to forward the close message.
-class Helper(Referenceable, service.MultiService):
+class Helper(Referenceable):
implements(interfaces.RIHelper, interfaces.IStatsProducer)
# this is the non-distributed version. When we need to have multiple
# helpers, this object will become the HelperCoordinator, and will query
{ },
"application-version": str(allmydata.__full_version__),
}
- chk_upload_helper_class = CHKUploadHelper
MAX_UPLOAD_STATUSES = 10
- def __init__(self, basedir, stats_provider=None):
+ def __init__(self, basedir, storage_broker, secret_holder,
+ stats_provider, history):
self._basedir = basedir
+ self._storage_broker = storage_broker
+ self._secret_holder = secret_holder
self._chk_incoming = os.path.join(basedir, "CHK_incoming")
self._chk_encoding = os.path.join(basedir, "CHK_encoding")
fileutil.make_dirs(self._chk_incoming)
fileutil.make_dirs(self._chk_encoding)
self._active_uploads = {}
self._all_uploads = weakref.WeakKeyDictionary() # for debugging
- self._all_upload_statuses = weakref.WeakKeyDictionary()
- self._recent_upload_statuses = []
self.stats_provider = stats_provider
if stats_provider:
stats_provider.register_producer(self)
"chk_upload_helper.fetched_bytes": 0,
"chk_upload_helper.encoded_bytes": 0,
}
- service.MultiService.__init__(self)
-
- def setServiceParent(self, parent):
- service.MultiService.setServiceParent(self, parent)
+ self._history = history
def log(self, *args, **kwargs):
if 'facility' not in kwargs:
kwargs['facility'] = "tahoe.helper"
- return self.parent.log(*args, **kwargs)
+ return log.msg(*args, **kwargs)
def count(self, key, value=1):
if self.stats_provider:
def remote_upload_chk(self, storage_index):
self.count("chk_upload_helper.upload_requests")
- r = upload.UploadResults()
- started = time.time()
- si_s = storage.si_b2a(storage_index)
- lp = self.log(format="helper: upload_chk query for SI %(si)s", si=si_s)
- incoming_file = os.path.join(self._chk_incoming, si_s)
- encoding_file = os.path.join(self._chk_encoding, si_s)
+ lp = self.log(format="helper: upload_chk query for SI %(si)s",
+ si=si_b2a(storage_index))
if storage_index in self._active_uploads:
self.log("upload is currently active", parent=lp)
uh = self._active_uploads[storage_index]
- return uh.start()
-
- d = self._check_for_chk_already_in_grid(storage_index, r, lp)
- def _checked(already_present):
- elapsed = time.time() - started
- r.timings['existence_check'] = elapsed
- if already_present:
- # the necessary results are placed in the UploadResults
- self.count("chk_upload_helper.upload_already_present")
- self.log("file already found in grid", parent=lp)
- return (r, None)
-
- self.count("chk_upload_helper.upload_need_upload")
- # the file is not present in the grid, by which we mean there are
- # less than 'N' shares available.
- self.log("unable to find file in the grid", parent=lp,
- level=log.NOISY)
- # We need an upload helper. Check our active uploads again in
- # case there was a race.
- if storage_index in self._active_uploads:
- self.log("upload is currently active", parent=lp)
- uh = self._active_uploads[storage_index]
- else:
- self.log("creating new upload helper", parent=lp)
- uh = self.chk_upload_helper_class(storage_index, self,
- incoming_file, encoding_file,
- r, lp)
- self._active_uploads[storage_index] = uh
- self._add_upload(uh)
- return uh.start()
- d.addCallback(_checked)
+ return (None, uh)
+
+ d = self._check_chk(storage_index, lp)
+ d.addCallback(self._did_chk_check, storage_index, lp)
def _err(f):
self.log("error while checking for chk-already-in-grid",
failure=f, level=log.WEIRD, parent=lp, umid="jDtxZg")
d.addErrback(_err)
return d
- def _check_for_chk_already_in_grid(self, storage_index, results, lp):
+ def _check_chk(self, storage_index, lp):
# see if this file is already in the grid
lp2 = self.log("doing a quick check+UEBfetch",
parent=lp, level=log.NOISY)
- c = CHKCheckerAndUEBFetcher(self.parent.get_permuted_peers,
- storage_index, lp2)
+ sb = self._storage_broker
+ c = CHKCheckerAndUEBFetcher(sb.get_servers_for_psi, storage_index, lp2)
d = c.check()
def _checked(res):
if res:
(sharemap, ueb_data, ueb_hash) = res
self.log("found file in grid", level=log.NOISY, parent=lp)
- results.uri_extension_hash = ueb_hash
- results.sharemap = sharemap
- results.uri_extension_data = ueb_data
- results.preexisting_shares = len(sharemap)
- results.pushed_shares = 0
- return True
- return False
+ hur = upload.HelperUploadResults()
+ hur.uri_extension_hash = ueb_hash
+ hur.sharemap = sharemap
+ hur.uri_extension_data = ueb_data
+ hur.preexisting_shares = len(sharemap)
+ hur.pushed_shares = 0
+ return hur
+ return None
d.addCallback(_checked)
return d
+ def _did_chk_check(self, already_present, storage_index, lp):
+ if already_present:
+ # the necessary results are placed in the UploadResults
+ self.count("chk_upload_helper.upload_already_present")
+ self.log("file already found in grid", parent=lp)
+ return (already_present, None)
+
+ self.count("chk_upload_helper.upload_need_upload")
+ # the file is not present in the grid, by which we mean there are
+ # less than 'N' shares available.
+ self.log("unable to find file in the grid", parent=lp,
+ level=log.NOISY)
+ # We need an upload helper. Check our active uploads again in
+ # case there was a race.
+ if storage_index in self._active_uploads:
+ self.log("upload is currently active", parent=lp)
+ uh = self._active_uploads[storage_index]
+ else:
+ self.log("creating new upload helper", parent=lp)
+ uh = self._make_chk_upload_helper(storage_index, lp)
+ self._active_uploads[storage_index] = uh
+ self._add_upload(uh)
+ return (None, uh)
+
+ def _make_chk_upload_helper(self, storage_index, lp):
+ si_s = si_b2a(storage_index)
+ incoming_file = os.path.join(self._chk_incoming, si_s)
+ encoding_file = os.path.join(self._chk_encoding, si_s)
+ uh = CHKUploadHelper(storage_index, self,
+ self._storage_broker,
+ self._secret_holder,
+ incoming_file, encoding_file,
+ lp)
+ return uh
+
def _add_upload(self, uh):
self._all_uploads[uh] = None
- s = uh.get_upload_status()
- self._all_upload_statuses[s] = None
- self._recent_upload_statuses.append(s)
- while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
- self._recent_upload_statuses.pop(0)
+ if self._history:
+ s = uh.get_upload_status()
+ self._history.notify_helper_upload(s)
def upload_finished(self, storage_index, size):
# this is called with size=0 if the upload failed
del self._active_uploads[storage_index]
s = uh.get_upload_status()
s.set_active(False)
-
- def get_all_upload_statuses(self):
- return self._all_upload_statuses