from allmydata.util.abbreviate import parse_abbreviated_size
from allmydata.uri import LiteralFileURI
from allmydata.dirnode import NewDirectoryNode
-from allmydata.mutable.filenode import MutableFileNode, MutableWatcher
+from allmydata.mutable.filenode import MutableFileNode
from allmydata.stats import StatsProvider
from allmydata.history import History
from allmydata.interfaces import IURI, INewDirectoryURI, IStatsProducer, \
convergence_s = self.get_or_create_private_config('convergence', _make_secret)
self.convergence = base32.a2b(convergence_s)
self._node_cache = weakref.WeakValueDictionary() # uri -> node
- self.add_service(History())
+ self.add_service(History(self.stats_provider))
self.add_service(Uploader(helper_furl, self.stats_provider))
download_cachedir = os.path.join(self.basedir,
"private", "cache", "download")
self.download_cache = cachedir.CacheDirectoryManager(download_cachedir)
self.download_cache.setServiceParent(self)
self.add_service(Downloader(self.stats_provider))
- self.add_service(MutableWatcher(self.stats_provider))
def _publish(res):
# we publish an empty object so that the introducer can count how
# many clients are connected and see what versions they're
self._node_cache[u_s] = node
return self._node_cache[u_s]
- def notify_publish(self, publish_status, size):
- self.getServiceNamed("mutable-watcher").notify_publish(publish_status,
- size)
- def notify_retrieve(self, retrieve_status):
- self.getServiceNamed("mutable-watcher").notify_retrieve(retrieve_status)
- def notify_mapupdate(self, update_status):
- self.getServiceNamed("mutable-watcher").notify_mapupdate(update_status)
-
def create_empty_dirnode(self):
n = NewDirectoryNode(self)
d = n.create(self._generate_pubprivkeys)
return self.get_history().list_all_download_statuses()
def list_all_mapupdate_statuses(self):
- watcher = self.getServiceNamed("mutable-watcher")
- return watcher.list_all_mapupdate_statuses()
+ return self.get_history().list_all_mapupdate_statuses()
def list_all_publish_statuses(self):
- watcher = self.getServiceNamed("mutable-watcher")
- return watcher.list_all_publish_statuses()
+ return self.get_history().list_all_publish_statuses()
def list_all_retrieve_statuses(self):
- watcher = self.getServiceNamed("mutable-watcher")
- return watcher.list_all_retrieve_statuses()
+ return self.get_history().list_all_retrieve_statuses()
def list_all_helper_statuses(self):
try:
name = "history"
MAX_DOWNLOAD_STATUSES = 10
MAX_UPLOAD_STATUSES = 10
+ MAX_MAPUPDATE_STATUSES = 20
+ MAX_PUBLISH_STATUSES = 20
+ MAX_RETRIEVE_STATUSES = 20
+
+ def __init__(self, stats_provider=None):
+ self.stats_provider = stats_provider
- def __init__(self):
self.all_downloads_statuses = weakref.WeakKeyDictionary()
self.recent_download_statuses = []
self.all_upload_statuses = weakref.WeakKeyDictionary()
self.recent_upload_statuses = []
+ self.all_mapupdate_status = weakref.WeakKeyDictionary()
+ self.recent_mapupdate_status = []
+ self.all_publish_status = weakref.WeakKeyDictionary()
+ self.recent_publish_status = []
+ self.all_retrieve_status = weakref.WeakKeyDictionary()
+ self.recent_retrieve_status = []
+
+
def add_download(self, download_status):
self.all_downloads_statuses[download_status] = None
self.recent_download_statuses.append(download_status)
def list_all_upload_statuses(self):
for us in self.all_upload_statuses:
yield us
+
+
+
+ def notify_mapupdate(self, p):
+ self.all_mapupdate_status[p] = None
+ self.recent_mapupdate_status.append(p)
+ while len(self.recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
+ self.recent_mapupdate_status.pop(0)
+
+ def notify_publish(self, p, size):
+ self.all_publish_status[p] = None
+ self.recent_publish_status.append(p)
+ if self.stats_provider:
+ self.stats_provider.count('mutable.files_published', 1)
+ # We must be told bytes_published as an argument, since the
+ # publish_status does not yet know how much data it will be asked
+ # to send. When we move to MDMF we'll need to find a better way
+ # to handle this.
+ self.stats_provider.count('mutable.bytes_published', size)
+ while len(self.recent_publish_status) > self.MAX_PUBLISH_STATUSES:
+ self.recent_publish_status.pop(0)
+
+ def notify_retrieve(self, r):
+ self.all_retrieve_status[r] = None
+ self.recent_retrieve_status.append(r)
+ if self.stats_provider:
+ self.stats_provider.count('mutable.files_retrieved', 1)
+ self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
+ while len(self.recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
+ self.recent_retrieve_status.pop(0)
+
+
+ def list_all_mapupdate_statuses(self):
+ for s in self.all_mapupdate_status:
+ yield s
+ def list_all_publish_statuses(self):
+ for s in self.all_publish_status:
+ yield s
+ def list_all_retrieve_statuses(self):
+ for s in self.all_retrieve_status:
+ yield s
+
+
+
def check(self, verify=False):
servermap = ServerMap()
u = ServermapUpdater(self._node, self._monitor, servermap, MODE_CHECK)
- self._node._client.notify_mapupdate(u.get_status())
+ history = self._node._client.get_history()
+ if history:
+ history.notify_mapupdate(u.get_status())
d = u.update()
d.addCallback(self._got_mapupdate_results)
if verify:
-import weakref, random
-from twisted.application import service
+import random
from zope.interface import implements
from twisted.internet import defer, reactor
return self._update_servermap(servermap, mode)
def _update_servermap(self, servermap, mode):
u = ServermapUpdater(self, Monitor(), servermap, mode)
- self._client.notify_mapupdate(u.get_status())
+ history = self._client.get_history()
+ if history:
+ history.notify_mapupdate(u.get_status())
return u.update()
def download_version(self, servermap, version, fetch_privkey=False):
def _try_once_to_download_version(self, servermap, version,
fetch_privkey=False):
r = Retrieve(self, servermap, version, fetch_privkey)
- self._client.notify_retrieve(r.get_status())
+ history = self._client.get_history()
+ if history:
+ history.notify_retrieve(r.get_status())
return r.download()
def upload(self, new_contents, servermap):
def _upload(self, new_contents, servermap):
assert self._pubkey, "update_servermap must be called before publish"
p = Publish(self, servermap)
- self._client.notify_publish(p.get_status(), len(new_contents))
+ history = self._client.get_history()
+ if history:
+ history.notify_publish(p.get_status(), len(new_contents))
return p.publish(new_contents)
-
-
-
-
-class MutableWatcher(service.MultiService):
- MAX_MAPUPDATE_STATUSES = 20
- MAX_PUBLISH_STATUSES = 20
- MAX_RETRIEVE_STATUSES = 20
- name = "mutable-watcher"
-
- def __init__(self, stats_provider=None):
- service.MultiService.__init__(self)
- self.stats_provider = stats_provider
- self._all_mapupdate_status = weakref.WeakKeyDictionary()
- self._recent_mapupdate_status = []
- self._all_publish_status = weakref.WeakKeyDictionary()
- self._recent_publish_status = []
- self._all_retrieve_status = weakref.WeakKeyDictionary()
- self._recent_retrieve_status = []
-
-
- def notify_mapupdate(self, p):
- self._all_mapupdate_status[p] = None
- self._recent_mapupdate_status.append(p)
- while len(self._recent_mapupdate_status) > self.MAX_MAPUPDATE_STATUSES:
- self._recent_mapupdate_status.pop(0)
-
- def notify_publish(self, p, size):
- self._all_publish_status[p] = None
- self._recent_publish_status.append(p)
- if self.stats_provider:
- self.stats_provider.count('mutable.files_published', 1)
- # We must be told bytes_published as an argument, since the
- # publish_status does not yet know how much data it will be asked
- # to send. When we move to MDMF we'll need to find a better way
- # to handle this.
- self.stats_provider.count('mutable.bytes_published', size)
- while len(self._recent_publish_status) > self.MAX_PUBLISH_STATUSES:
- self._recent_publish_status.pop(0)
-
- def notify_retrieve(self, r):
- self._all_retrieve_status[r] = None
- self._recent_retrieve_status.append(r)
- if self.stats_provider:
- self.stats_provider.count('mutable.files_retrieved', 1)
- self.stats_provider.count('mutable.bytes_retrieved', r.get_size())
- while len(self._recent_retrieve_status) > self.MAX_RETRIEVE_STATUSES:
- self._recent_retrieve_status.pop(0)
-
-
- def list_all_mapupdate_statuses(self):
- return self._all_mapupdate_status.keys()
- def list_all_publish_statuses(self):
- return self._all_publish_status.keys()
- def list_all_retrieve_statuses(self):
- return self._all_retrieve_status.keys()
d.addCallback(lambda res: n)
return d
- def notify_retrieve(self, r):
- pass
- def notify_publish(self, p, size):
- pass
- def notify_mapupdate(self, u):
- pass
+ def get_history(self):
+ return None
def create_node_from_uri(self, u):
u = IURI(u)
for us in self.clients[0].list_all_upload_statuses():
if us.get_size() > 200:
self._up_status = us.get_counter()
- rs = self.clients[0].list_all_retrieve_statuses()[0]
+ rs = list(self.clients[0].list_all_retrieve_statuses())[0]
self._retrieve_status = rs.get_counter()
- ps = self.clients[0].list_all_publish_statuses()[0]
+ ps = list(self.clients[0].list_all_publish_statuses())[0]
self._publish_status = ps.get_counter()
- us = self.clients[0].list_all_mapupdate_statuses()[0]
+ us = list(self.clients[0].list_all_mapupdate_statuses())[0]
self._update_status = us.get_counter()
# and that there are some upload- and download- status pages