-import os.path, re, urllib, time
+import os.path, re, urllib, time, cgi
import simplejson
from StringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.internet.task import Clock
-from twisted.web import client, error, http, html
+from twisted.web import client, error, http
from twisted.python import failure, log
from foolscap.api import fireEventually, flushEventualQueue
class FakeUploader(service.Service):
name = "uploader"
+ helper_furl = None
+ helper_connected = False
+
def upload(self, uploadable):
d = uploadable.get_size()
d.addCallback(lambda size: uploadable.read(size))
return ur
d.addCallback(_got_data)
return d
+
def get_helper_info(self):
- return (None, False)
+ return (self.helper_furl, self.helper_connected)
+
def build_one_ds():
ds = DownloadStatus("storage_index", 1234)
def list_all_helper_statuses(self):
return []
+class FakeDisplayableServer(StubServer):
+ def __init__(self, serverid, nickname):
+ StubServer.__init__(self, serverid)
+ self.announcement = {"my-version": "allmydata-tahoe-fake",
+ "service-name": "storage",
+ "nickname": nickname}
+ def is_connected(self):
+ return True
+ def get_permutation_seed(self):
+ return ""
+ def get_remote_host(self):
+ return ""
+ def get_last_loss_time(self):
+ return None
+ def get_announcement_time(self):
+ return None
+ def get_announcement(self):
+ return self.announcement
+ def get_nickname(self):
+ return self.announcement["nickname"]
+
+class FakeBucketCounter(object):
+ def get_state(self):
+ return {"last-complete-bucket-count": 0}
+ def get_progress(self):
+ return {"estimated-time-per-cycle": 0,
+ "cycle-in-progress": False,
+ "remaining-wait-time": 0}
+
+class FakeLeaseChecker(object):
+ def __init__(self):
+ self.expiration_enabled = False
+ self.mode = "age"
+ self.override_lease_duration = None
+ self.sharetypes_to_expire = {}
+ def get_state(self):
+ return {"history": None}
+ def get_progress(self):
+ return {"estimated-time-per-cycle": 0,
+ "cycle-in-progress": False,
+ "remaining-wait-time": 0}
+
+class FakeStorageServer(service.MultiService):
+ name = 'storage'
+ def __init__(self, nodeid, nickname):
+ service.MultiService.__init__(self)
+ self.my_nodeid = nodeid
+ self.nickname = nickname
+ self.bucket_counter = FakeBucketCounter()
+ self.lease_checker = FakeLeaseChecker()
+ def get_stats(self):
+ return {"storage_server.accepting_immutable_shares": False}
+
class FakeClient(Client):
def __init__(self):
# don't upcall to Client.__init__, since we only want to initialize a
service.MultiService.__init__(self)
self.all_contents = {}
self.nodeid = "fake_nodeid"
- self.nickname = "fake_nickname"
+ self.nickname = u"fake_nickname \u263A"
self.introducer_furl = "None"
self.stats_provider = FakeStatsProvider()
self._secret_holder = SecretHolder("lease secret", "convergence secret")
self.helper = None
self.convergence = "some random string"
self.storage_broker = StorageFarmBroker(None, permute_peers=True)
+ # fake knowledge of another server
+ self.storage_broker.test_add_server("other_nodeid",
+ FakeDisplayableServer("other_nodeid", u"other_nickname \u263B"))
self.introducer_client = None
self.history = FakeHistory()
self.uploader = FakeUploader()
None, None, None)
self.nodemaker.all_contents = self.all_contents
self.mutable_file_default = SDMF_VERSION
+ self.addService(FakeStorageServer(self.nodeid, self.nickname))
def startService(self):
return service.MultiService.startService(self)
self._htmlname_raw = self._htmlname_unicode.encode('utf-8')
self._htmlname_urlencoded = urllib.quote(self._htmlname_raw, '')
self._htmlname_escaped = escapeToXML(self._htmlname_raw)
- self._htmlname_escaped_attr = html.escape(self._htmlname_raw)
- self._htmlname_escaped_double = escapeToXML(html.escape(self._htmlname_raw))
+ self._htmlname_escaped_attr = cgi.escape(self._htmlname_raw, quote=True)
+ self._htmlname_escaped_double = escapeToXML(cgi.escape(self._htmlname_raw, quote=True))
self.HTMLNAME_CONTENTS, n, self._htmlname_txt_uri = self.makefile(0)
foo.set_uri(self._htmlname_unicode, self._htmlname_txt_uri, self._htmlname_txt_uri)
def test_welcome(self):
d = self.GET("/")
def _check(res):
- self.failUnlessIn('Welcome to Tahoe-LAFS', res)
+ self.failUnlessIn('<title>Tahoe-LAFS - Welcome</title>', res)
self.failUnlessIn(FAVICON_MARKUP, res)
- self.failUnlessIn('href="https://tahoe-lafs.org/"', res)
+ self.failUnlessIn('<a href="status">Recent and Active Operations</a>', res)
+ self.failUnlessIn('<a href="statistics">Operational Statistics</a>', res)
+ self.failUnlessIn('<input type="hidden" name="t" value="report-incident" />', res)
+ res_u = res.decode('utf-8')
+ self.failUnlessIn(u'<td>fake_nickname \u263A</td>', res_u)
+ self.failUnlessIn(u'<div class="nickname">other_nickname \u263B</div>', res_u)
+ self.failUnlessIn(u'\u00A9 <a href="https://tahoe-lafs.org/">Tahoe-LAFS Software Foundation', res_u)
self.s.basedir = 'web/test_welcome'
fileutil.make_dirs("web/test_welcome")
d.addCallback(_check)
return d
+ def test_helper_status(self):
+ d = defer.succeed(None)
+
+ # set helper furl to None
+ def _set_no_helper(ign):
+ self.s.uploader.helper_furl = None
+ return self.GET("/")
+ d.addCallback(_set_no_helper)
+ def _check_no_helper(res):
+ html = res.replace('\n', ' ')
+ self.failUnless(re.search('<div class="status-indicator connected-not-configured"></div>[ ]*Helper', html), res)
+ d.addCallback(_check_no_helper)
+
+ # enable helper, not connected
+ def _set_helper_not_connected(ign):
+ self.s.uploader.helper_furl = "pb://someHelper"
+ self.s.uploader.helper_connected = False
+ return self.GET("/")
+ d.addCallback(_set_helper_not_connected)
+ def _check_helper_not_connected(res):
+ html = res.replace('\n', ' ')
+ self.failUnless(re.search('<div class="status-indicator connected-no"></div>[ ]*Helper', html), res)
+ d.addCallback(_check_helper_not_connected)
+
+ # enable helper, connected
+ def _set_helper_connected(ign):
+ self.s.uploader.helper_furl = "pb://someHelper"
+ self.s.uploader.helper_connected = True
+ return self.GET("/")
+ d.addCallback(_set_helper_connected)
+ def _check_helper_connected(res):
+ html = res.replace('\n', ' ')
+ self.failUnless(re.search('<div class="status-indicator connected-yes"></div>[ ]*Helper', html), res)
+ d.addCallback(_check_helper_connected)
+ return d
+
+ def test_storage(self):
+ d = self.GET("/storage")
+ def _check(res):
+ self.failUnlessIn('Storage Server Status', res)
+ self.failUnlessIn(FAVICON_MARKUP, res)
+ res_u = res.decode('utf-8')
+ self.failUnlessIn(u'<li>Server Nickname: <span class="nickname mine">fake_nickname \u263A</span></li>', res_u)
+ d.addCallback(_check)
+ return d
+
def test_status(self):
h = self.s.get_history()
dl_num = h.list_all_download_statuses()[0].get_counter()
ret_num = h.list_all_retrieve_statuses()[0].get_counter()
d = self.GET("/status", followRedirect=True)
def _check(res):
- self.failUnlessIn('Upload and Download Status', res)
+ self.failUnlessIn('Recent and Active Operations', res)
self.failUnlessIn('"down-%d"' % dl_num, res)
self.failUnlessIn('"up-%d"' % ul_num, res)
self.failUnlessIn('"mapupdate-%d"' % mu_num, res)
self.failUnlessReallyEqual(statuscode, str(http.FOUND))
self.failUnless(target.startswith(self.webish_url), target)
return client.getPage(target, method="GET")
+ # We encode "uri" as "%75ri" to exercise a case affected by ticket #1860.
d = self.shouldRedirect2("test_POST_upload_no_link_whendone_results",
check,
self.POST, "/uri", t="upload",
- when_done="/uri/%(uri)s",
+ when_done="/%75ri/%(uri)s",
file=("new.txt", self.NEWFILE_CONTENTS))
d.addCallback(lambda res:
self.failUnlessReallyEqual(res, self.NEWFILE_CONTENTS))
d = self.GET("/")
def _after_get_welcome_page(res):
MKDIR_BUTTON_RE = re.compile(
- '<form action="([^"]*)" method="post".*?'
- '<input type="hidden" name="t" value="([^"]*)" />'
- '<input type="hidden" name="([^"]*)" value="([^"]*)" />'
- '<input type="submit" value="Create a directory" />',
- re.I)
- mo = MKDIR_BUTTON_RE.search(res)
+ '<form action="([^"]*)" method="post".*'
+ '<input type="hidden" name="t" value="([^"]*)" />[ ]*'
+ '<input type="hidden" name="([^"]*)" value="([^"]*)" />[ ]*'
+ '<input type="submit" class="btn" value="Create a directory[^"]*" />')
+ html = res.replace('\n', ' ')
+ mo = MKDIR_BUTTON_RE.search(html)
+ self.failUnless(mo, html)
formaction = mo.group(1)
formt = mo.group(2)
formaname = mo.group(3)