d.addCallback(make_key_objs)
return d
else:
- # RSA key generation for a 2048 bit key takes between 0.8 and 3.2 secs
+ # RSA key generation for a 2048 bit key takes between 0.8 and 3.2
+ # secs
signer = rsa.generate(key_size)
verifier = signer.get_verifying_key()
return verifier, signer
return uploader.upload(uploadable)
- def list_all_uploads(self):
+ def list_all_upload_statuses(self):
uploader = self.getServiceNamed("uploader")
- return uploader.list_all_uploads()
- def list_active_uploads(self):
- uploader = self.getServiceNamed("uploader")
- return uploader.list_active_uploads()
- def list_recent_uploads(self):
- uploader = self.getServiceNamed("uploader")
- return uploader.list_recent_uploads()
+ return uploader.list_all_upload_statuses()
- def list_all_downloads(self):
- downloader = self.getServiceNamed("downloader")
- return downloader.list_all_downloads()
- def list_active_downloads(self):
- downloader = self.getServiceNamed("downloader")
- return downloader.list_active_downloads()
- def list_recent_downloads(self):
+ def list_all_download_statuses(self):
downloader = self.getServiceNamed("downloader")
- return downloader.list_recent_downloads()
-
- def list_all_mapupdate(self):
- watcher = self.getServiceNamed("mutable-watcher")
- return watcher.list_all_mapupdate()
- def list_active_mapupdate(self):
- watcher = self.getServiceNamed("mutable-watcher")
- return watcher.list_active_mapupdate()
- def list_recent_mapupdate(self):
- watcher = self.getServiceNamed("mutable-watcher")
- return watcher.list_recent_mapupdate()
+ return downloader.list_all_download_statuses()
- def list_all_publish(self):
+ def list_all_mapupdate_statuses(self):
watcher = self.getServiceNamed("mutable-watcher")
- return watcher.list_all_publish()
- def list_active_publish(self):
+ return watcher.list_all_mapupdate_statuses()
+ def list_all_publish_statuses(self):
watcher = self.getServiceNamed("mutable-watcher")
- return watcher.list_active_publish()
- def list_recent_publish(self):
+ return watcher.list_all_publish_statuses()
+ def list_all_retrieve_statuses(self):
watcher = self.getServiceNamed("mutable-watcher")
- return watcher.list_recent_publish()
+ return watcher.list_all_retrieve_statuses()
- def list_all_retrieve(self):
- watcher = self.getServiceNamed("mutable-watcher")
- return watcher.list_all_retrieve()
- def list_active_retrieve(self):
- watcher = self.getServiceNamed("mutable-watcher")
- return watcher.list_active_retrieve()
- def list_recent_retrieve(self):
- watcher = self.getServiceNamed("mutable-watcher")
- return watcher.list_recent_retrieve()
-
- def list_active_helper_statuses(self):
- try:
- helper = self.getServiceNamed("helper")
- except KeyError:
- return []
- return helper.get_active_upload_statuses()
- def list_recent_helper_statuses(self):
+ def list_all_helper_statuses(self):
try:
helper = self.getServiceNamed("helper")
except KeyError:
return []
- return helper.get_recent_upload_statuses()
+ return helper.get_all_upload_statuses()
def __init__(self, stats_provider=None):
service.MultiService.__init__(self)
- self._all_downloads = weakref.WeakKeyDictionary()
self.stats_provider = stats_provider
- self._recent_download_status = []
+ self._all_downloads = weakref.WeakKeyDictionary() # for debugging
+ self._all_download_statuses = weakref.WeakKeyDictionary()
+ self._recent_download_statuses = []
def download(self, u, t):
assert self.parent
dl = FileDownloader(self.parent, u, t)
else:
raise RuntimeError("I don't know how to download a %s" % u)
- self._all_downloads[dl] = None
- self._recent_download_status.append(dl.get_download_status())
- while len(self._recent_download_status) > self.MAX_DOWNLOAD_STATUSES:
- self._recent_download_status.pop(0)
+ self._add_download(dl)
d = dl.start()
return d
def download_to_filehandle(self, uri, filehandle):
return self.download(uri, FileHandle(filehandle))
-
- def list_all_downloads(self):
- return self._all_downloads.keys()
- def list_active_downloads(self):
- return [d.get_download_status() for d in self._all_downloads.keys()
- if d.get_download_status().get_active()]
- def list_recent_downloads(self):
- return self._recent_download_status
+ def _add_download(self, downloader):
+ self._all_downloads[downloader] = None
+ s = downloader.get_download_status()
+ self._all_download_statuses[s] = None
+ self._recent_download_statuses.append(s)
+ while len(self._recent_download_statuses) > self.MAX_DOWNLOAD_STATUSES:
+ self._recent_download_statuses.pop(0)
+
+ def list_all_download_statuses(self):
+ for ds in self._all_download_statuses:
+ yield ds
while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
self._recent_mapupdate_status.pop(0)
- def list_all_mapupdate(self):
- return self._all_mapupdate_status.keys()
- def list_active_mapupdate(self):
- return [p for p in self._all_mapupdate_status.keys() if p.get_active()]
- def list_recent_mapupdate(self):
- return self._recent_mapupdate_status
-
-
def notify_publish(self, p):
self._all_publish_status[p] = None
self._recent_publish_status.append(p)
while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
self._recent_publish_status.pop(0)
- def list_all_publish(self):
- return self._all_publish_status.keys()
- def list_active_publish(self):
- return [p for p in self._all_publish_status.keys() if p.get_active()]
- def list_recent_publish(self):
- return self._recent_publish_status
-
-
def notify_retrieve(self, r):
self._all_retrieve_status[r] = None
self._recent_retrieve_status.append(r)
while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
self._recent_retrieve_status.pop(0)
- def list_all_retrieve(self):
+
+ def list_all_mapupdate_statuses(self):
+ return self._all_mapupdate_status.keys()
+ def list_all_publish_statuses(self):
+ return self._all_publish_status.keys()
+ def list_all_retrieve_statuses(self):
return self._all_retrieve_status.keys()
- def list_active_retrieve(self):
- return [p for p in self._all_retrieve_status.keys() if p.get_active()]
- def list_recent_retrieve(self):
- return self._recent_retrieve_status
self._status.set_helper(False)
self._status.set_progress(0.0)
self._status.set_active(True)
- self._status.set_servermap(servermap)
def log(self, *args, **kwargs):
if 'parent' not in kwargs:
# initial publish
self._new_seqnum = 1
self._servermap = ServerMap()
+ self._status.set_servermap(self._servermap)
self.log(format="new seqnum will be %(seqnum)d",
seqnum=self._new_seqnum, level=log.NOISY)
-import os, stat, time
+import os, stat, time, weakref
from zope.interface import implements
from twisted.application import service
from twisted.internet import defer
fileutil.make_dirs(self._chk_incoming)
fileutil.make_dirs(self._chk_encoding)
self._active_uploads = {}
+ self._all_uploads = weakref.WeakKeyDictionary() # for debugging
+ self._all_upload_statuses = weakref.WeakKeyDictionary()
self._recent_upload_statuses = []
self.stats_provider = stats_provider
if stats_provider:
incoming_file, encoding_file,
r, lp)
self._active_uploads[storage_index] = uh
+ self._add_upload(uh)
return uh.start()
d.addCallback(_checked)
def _err(f):
d.addCallback(_checked)
return d
+ def _add_upload(self, uh):
+ self._all_uploads[uh] = None
+ s = uh.get_upload_status()
+ self._all_upload_statuses[s] = None
+ self._recent_upload_statuses.append(s)
+ while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
+ self._recent_upload_statuses.pop(0)
+
def upload_finished(self, storage_index, size):
+ # this is called with size=0 if the upload failed
self.count("chk_upload_helper.encoded_bytes", size)
uh = self._active_uploads[storage_index]
del self._active_uploads[storage_index]
s = uh.get_upload_status()
s.set_active(False)
- self._recent_upload_statuses.append(s)
- while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
- self._recent_upload_statuses.pop(0)
-
- def get_active_upload_statuses(self):
- return [u.get_upload_status() for u in self._active_uploads.values()]
- def get_recent_upload_statuses(self):
- return self._recent_upload_statuses
+ def get_all_upload_statuses(self):
+ return self._all_upload_statuses
def _got_status(res):
# find an interesting upload and download to look at. LIT files
# are not interesting.
- for dl in self.clients[0].list_recent_downloads():
- if dl.get_size() > 200:
- self._down_status = dl.get_counter()
- for ul in self.clients[0].list_recent_uploads():
- if ul.get_size() > 200:
- self._up_status = ul.get_counter()
- rs = self.clients[0].list_recent_retrieve()[0]
+ for ds in self.clients[0].list_all_download_statuses():
+ if ds.get_size() > 200:
+ self._down_status = ds.get_counter()
+ for us in self.clients[0].list_all_upload_statuses():
+ if us.get_size() > 200:
+ self._up_status = us.get_counter()
+ rs = self.clients[0].list_all_retrieve_statuses()[0]
self._retrieve_status = rs.get_counter()
- ps = self.clients[0].list_recent_publish()[0]
+ ps = self.clients[0].list_all_publish_statuses()[0]
self._publish_status = ps.get_counter()
+ us = self.clients[0].list_all_mapupdate_statuses()[0]
+ self._update_status = us.get_counter()
# and that there are some upload- and download- status pages
return self.GET("status/up-%d" % self._up_status)
return self.GET("status/down-%d" % self._down_status)
d.addCallback(_got_up)
def _got_down(res):
- return self.GET("status/publish-%d" % self._publish_status)
+ return self.GET("status/mapupdate-%d" % self._update_status)
d.addCallback(_got_down)
+ def _got_update(res):
+ return self.GET("status/publish-%d" % self._publish_status)
+ d.addCallback(_got_update)
def _got_publish(res):
return self.GET("status/retrieve-%d" % self._retrieve_status)
d.addCallback(_got_publish)
from allmydata import interfaces, provisioning, uri, webish, upload, download
from allmydata.web import status
from allmydata.util import fileutil
-from allmydata.test.common import FakeDirectoryNode, FakeCHKFileNode, FakeMutableFileNode, create_chk_filenode
-from allmydata.interfaces import IURI, INewDirectoryURI, IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, IMutableFileNode
+from allmydata.test.common import FakeDirectoryNode, FakeCHKFileNode, \
+ FakeMutableFileNode, create_chk_filenode
+from allmydata.interfaces import IURI, INewDirectoryURI, \
+ IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, IMutableFileNode
+from allmydata.mutable import servermap, publish, retrieve
# create a fake uploader/downloader, and a couple of fake dirnodes, then
# create a webserver that works against them
introducer_client = FakeIntroducerClient()
_all_upload_status = [upload.UploadStatus()]
_all_download_status = [download.DownloadStatus()]
+ _all_mapupdate_statuses = [servermap.UpdateStatus()]
+ _all_publish_statuses = [publish.PublishStatus()]
+ _all_retrieve_statuses = [retrieve.RetrieveStatus()]
convergence = "some random string"
def connected_to_introducer(self):
d.addCallback(_got_data)
return d
- def list_all_uploads(self):
- return []
- def list_all_downloads(self):
- return []
-
- def list_active_uploads(self):
- return self._all_upload_status
- def list_active_downloads(self):
- return self._all_download_status
- def list_active_publish(self):
- return []
- def list_active_retrieve(self):
- return []
- def list_active_mapupdate(self):
- return []
- def list_recent_mapupdate(self):
- return []
-
- def list_recent_uploads(self):
+ def list_all_upload_statuses(self):
return self._all_upload_status
- def list_recent_downloads(self):
+ def list_all_download_statuses(self):
return self._all_download_status
- def list_recent_publish(self):
- return []
- def list_recent_retrieve(self):
- return []
- def list_active_helper_statuses(self):
- return []
- def list_recent_helper_statuses(self):
+ def list_all_mapupdate_statuses(self):
+ return self._all_mapupdate_statuses
+ def list_all_publish_statuses(self):
+ return self._all_publish_statuses
+ def list_all_retrieve_statuses(self):
+ return self._all_retrieve_statuses
+ def list_all_helper_statuses(self):
return []
class WebMixin(object):
return d
def test_status(self):
- dl_num = self.s.list_recent_downloads()[0].get_counter()
- ul_num = self.s.list_recent_uploads()[0].get_counter()
+ dl_num = self.s.list_all_download_statuses()[0].get_counter()
+ ul_num = self.s.list_all_upload_statuses()[0].get_counter()
+ mu_num = self.s.list_all_mapupdate_statuses()[0].get_counter()
+ pub_num = self.s.list_all_publish_statuses()[0].get_counter()
+ ret_num = self.s.list_all_retrieve_statuses()[0].get_counter()
d = self.GET("/status", followRedirect=True)
def _check(res):
self.failUnless('Upload and Download Status' in res, res)
self.failUnless('"down-%d"' % dl_num in res, res)
self.failUnless('"up-%d"' % ul_num in res, res)
+ self.failUnless('"mapupdate-%d"' % mu_num in res, res)
+ self.failUnless('"publish-%d"' % pub_num in res, res)
+ self.failUnless('"retrieve-%d"' % ret_num in res, res)
d.addCallback(_check)
d.addCallback(lambda res: self.GET("/status/down-%d" % dl_num))
def _check_dl(res):
def _check_ul(res):
self.failUnless("File Upload Status" in res, res)
d.addCallback(_check_ul)
+ d.addCallback(lambda res: self.GET("/status/mapupdate-%d" % mu_num))
+ def _check_mapupdate(res):
+ self.failUnless("Mutable File Servermap Update Status" in res, res)
+ d.addCallback(_check_mapupdate)
+ d.addCallback(lambda res: self.GET("/status/publish-%d" % pub_num))
+ def _check_publish(res):
+ self.failUnless("Mutable File Publish Status" in res, res)
+ d.addCallback(_check_publish)
+ d.addCallback(lambda res: self.GET("/status/retrieve-%d" % ret_num))
+ def _check_retrieve(res):
+ self.failUnless("Mutable File Retrieve Status" in res, res)
+ d.addCallback(_check_retrieve)
+
return d
def test_status_numbers(self):
self._helper_furl = helper_furl
self.stats_provider = stats_provider
self._helper = None
- self._all_uploads = weakref.WeakKeyDictionary()
- self._recent_upload_status = []
+ self._all_uploads = weakref.WeakKeyDictionary() # for debugging
+ self._all_upload_statuses = weakref.WeakKeyDictionary()
+ self._recent_upload_statuses = []
service.MultiService.__init__(self)
def startService(self):
uploader = AssistedUploader(self._helper)
else:
uploader = self.uploader_class(self.parent)
- self._all_uploads[uploader] = None
- self._recent_upload_status.append(uploader.get_upload_status())
- while len(self._recent_upload_status) > self.MAX_UPLOAD_STATUSES:
- self._recent_upload_status.pop(0)
+ self._add_upload(uploader)
return uploader.start(uploadable)
d.addCallback(_got_size)
def _done(res):
d.addBoth(_done)
return d
- def list_all_uploads(self):
- return self._all_uploads.keys()
- def list_active_uploads(self):
- return [u.get_upload_status() for u in self._all_uploads.keys()
- if u.get_upload_status().get_active()]
- def list_recent_uploads(self):
- return self._recent_upload_status
+ def _add_upload(self, uploader):
+ s = uploader.get_upload_status()
+ self._all_uploads[uploader] = None
+ self._all_upload_statuses[s] = None
+ self._recent_upload_statuses.append(s)
+ while len(self._recent_upload_statuses) > self.MAX_UPLOAD_STATUSES:
+ self._recent_upload_statuses.pop(0)
+
+ def list_all_upload_statuses(self):
+ for us in self._all_upload_statuses:
+ yield us
-import time, pprint
+import time, pprint, itertools
import simplejson
from twisted.internet import defer
from nevow import rend, inevow, tags as T
d = self.upload_results()
def _convert(r):
file_size = r.file_size
- if file_size is None:
- return None
time1 = r.timings.get("cumulative_encoding")
- if time1 is None:
- return None
time2 = r.timings.get("cumulative_sending")
- if time2 is None:
+ if (file_size is None or time1 is None or time2 is None):
return None
try:
return 1.0 * file_size / (time1+time2)
d = self.upload_results()
def _convert(r):
fetch_size = r.ciphertext_fetched
- if fetch_size is None:
- return None
time = r.timings.get("cumulative_fetch")
- if time is None:
+ if (fetch_size is None or time is None):
return None
try:
return 1.0 * fetch_size / time
def data_rate_total(self, ctx, data):
return self._get_rate(data, "total")
- def data_time_peer_selection(self, ctx, data):
- return self.retrieve_status.timings.get("peer_selection")
-
def data_time_fetch(self, ctx, data):
return self.retrieve_status.timings.get("fetch")
def data_rate_fetch(self, ctx, data):
return self._get_rate(data, "fetch")
- def data_time_cumulative_verify(self, ctx, data):
- return self.retrieve_status.timings.get("cumulative_verify")
-
def data_time_decode(self, ctx, data):
return self.retrieve_status.timings.get("decode")
def data_rate_decode(self, ctx, data):
docFactory = getxmlfile("status.xhtml")
addSlash = True
+ def _get_all_statuses(self, client):
+ return itertools.chain(client.list_all_upload_statuses(),
+ client.list_all_download_statuses(),
+ client.list_all_mapupdate_statuses(),
+ client.list_all_publish_statuses(),
+ client.list_all_retrieve_statuses(),
+ client.list_all_helper_statuses(),
+ )
+
def data_active_operations(self, ctx, data):
client = IClient(ctx)
- active = (client.list_active_uploads() +
- client.list_active_downloads() +
- client.list_active_mapupdate() +
- client.list_active_publish() +
- client.list_active_retrieve() +
- client.list_active_helper_statuses())
+ active = [s
+ for s in self._get_all_statuses(client)
+ if s.get_active()]
return active
def data_recent_operations(self, ctx, data):
client = IClient(ctx)
- recent = [o for o in (client.list_recent_uploads() +
- client.list_recent_downloads() +
- client.list_recent_mapupdate() +
- client.list_recent_publish() +
- client.list_recent_retrieve() +
- client.list_recent_helper_statuses())
- if not o.get_active()]
+ recent = [s
+ for s in self._get_all_statuses(client)
+ if not s.get_active()]
recent.sort(lambda a,b: cmp(a.get_started(), b.get_started()))
recent.reverse()
return recent
stype,count_s = name.split("-")
count = int(count_s)
if stype == "up":
- for s in client.list_recent_uploads():
- if s.get_counter() == count:
- return UploadStatusPage(s)
- for u in client.list_all_uploads():
- # u is an uploader object
- s = u.get_upload_status()
+ for s in itertools.chain(client.list_all_upload_statuses(),
+ client.list_all_helper_statuses()):
+ # immutable-upload helpers use the same status object as a
+ # regular immutable-upload
if s.get_counter() == count:
return UploadStatusPage(s)
- for s in (client.list_active_helper_statuses() +
- client.list_recent_helper_statuses()):
- if s.get_counter() == count:
- # immutable-upload helpers use the same status object as
- # a regular immutable-upload
- return UploadStatusPage(s)
if stype == "down":
- for s in client.list_recent_downloads():
- if s.get_counter() == count:
- return DownloadStatusPage(s)
- for d in client.list_all_downloads():
- s = d.get_download_status()
+ for s in client.list_all_download_statuses():
if s.get_counter() == count:
return DownloadStatusPage(s)
if stype == "mapupdate":
- for s in client.list_recent_mapupdate():
- if s.get_counter() == count:
- return MapupdateStatusPage(s)
- for p in client.list_all_mapupdate():
- s = p.get_status()
+ for s in client.list_all_mapupdate_statuses():
if s.get_counter() == count:
return MapupdateStatusPage(s)
if stype == "publish":
- for s in client.list_recent_publish():
- if s.get_counter() == count:
- return PublishStatusPage(s)
- for p in client.list_all_publish():
- s = p.get_status()
+ for s in client.list_all_publish_statuses():
if s.get_counter() == count:
return PublishStatusPage(s)
if stype == "retrieve":
- for s in client.list_recent_retrieve():
- if s.get_counter() == count:
- return RetrieveStatusPage(s)
- for r in client.list_all_retrieve():
- s = r.get_status()
+ for s in client.list_all_retrieve_statuses():
if s.get_counter() == count:
return RetrieveStatusPage(s)