-import os.path, re, urllib, time
+import os.path, re, urllib, time, cgi
import simplejson
from StringIO import StringIO
from twisted.trial import unittest
from twisted.internet import defer, reactor
from twisted.internet.task import Clock
-from twisted.web import client, error, http, html
+from twisted.web import client, error, http
from twisted.python import failure, log
from foolscap.api import fireEventually, flushEventualQueue
unknown_immcap = u"imm.lafs://immutable_from_the_future_imm_\u263A".encode('utf-8')
FAVICON_MARKUP = '<link href="/icon.png" rel="shortcut icon" />'
-
+DIR_HTML_TAG = '<html lang="en">'
class FakeStatsProvider:
def get_stats(self):
class FakeUploader(service.Service):
name = "uploader"
+ helper_furl = None
+ helper_connected = False
+
def upload(self, uploadable):
d = uploadable.get_size()
d.addCallback(lambda size: uploadable.read(size))
return ur
d.addCallback(_got_data)
return d
+
def get_helper_info(self):
- return (None, False)
+ return (self.helper_furl, self.helper_connected)
+
def build_one_ds():
ds = DownloadStatus("storage_index", 1234)
def list_all_helper_statuses(self):
return []
+class FakeDisplayableServer(StubServer):
+ def __init__(self, serverid, nickname, connected,
+ last_connect_time, last_loss_time, last_rx_time):
+ StubServer.__init__(self, serverid)
+ self.announcement = {"my-version": "allmydata-tahoe-fake",
+ "service-name": "storage",
+ "nickname": nickname}
+ self.connected = connected
+ self.last_loss_time = last_loss_time
+ self.last_rx_time = last_rx_time
+ self.last_connect_time = last_connect_time
+ def is_connected(self):
+ return self.connected
+ def get_permutation_seed(self):
+ return ""
+ def get_remote_host(self):
+ return ""
+ def get_last_loss_time(self):
+ return self.last_loss_time
+ def get_last_received_data_time(self):
+ return self.last_rx_time
+ def get_last_connect_time(self):
+ return self.last_connect_time
+ def get_announcement(self):
+ return self.announcement
+ def get_nickname(self):
+ return self.announcement["nickname"]
+ def get_available_space(self):
+ return 123456
+
+class FakeBucketCounter(object):
+ def get_state(self):
+ return {"last-complete-bucket-count": 0}
+ def get_progress(self):
+ return {"estimated-time-per-cycle": 0,
+ "cycle-in-progress": False,
+ "remaining-wait-time": 0}
+
+class FakeLeaseChecker(object):
+ def __init__(self):
+ self.expiration_enabled = False
+ self.mode = "age"
+ self.override_lease_duration = None
+ self.sharetypes_to_expire = {}
+ def get_state(self):
+ return {"history": None}
+ def get_progress(self):
+ return {"estimated-time-per-cycle": 0,
+ "cycle-in-progress": False,
+ "remaining-wait-time": 0}
+
+class FakeStorageServer(service.MultiService):
+ name = 'storage'
+ def __init__(self, nodeid, nickname):
+ service.MultiService.__init__(self)
+ self.my_nodeid = nodeid
+ self.nickname = nickname
+ self.bucket_counter = FakeBucketCounter()
+ self.lease_checker = FakeLeaseChecker()
+ def get_stats(self):
+ return {"storage_server.accepting_immutable_shares": False}
+
class FakeClient(Client):
def __init__(self):
# don't upcall to Client.__init__, since we only want to initialize a
service.MultiService.__init__(self)
self.all_contents = {}
self.nodeid = "fake_nodeid"
- self.nickname = "fake_nickname"
+ self.nickname = u"fake_nickname \u263A"
self.introducer_furl = "None"
self.stats_provider = FakeStatsProvider()
self._secret_holder = SecretHolder("lease secret", "convergence secret")
self.helper = None
self.convergence = "some random string"
self.storage_broker = StorageFarmBroker(None, permute_peers=True)
+ # fake knowledge of another server
+ self.storage_broker.test_add_server("other_nodeid",
+ FakeDisplayableServer(
+ serverid="other_nodeid", nickname=u"other_nickname \u263B", connected = True,
+ last_connect_time = 10, last_loss_time = 20, last_rx_time = 30))
+ self.storage_broker.test_add_server("disconnected_nodeid",
+ FakeDisplayableServer(
+ serverid="other_nodeid", nickname=u"disconnected_nickname \u263B", connected = False,
+ last_connect_time = 15, last_loss_time = 25, last_rx_time = 35))
self.introducer_client = None
self.history = FakeHistory()
self.uploader = FakeUploader()
None, None, None)
self.nodemaker.all_contents = self.all_contents
self.mutable_file_default = SDMF_VERSION
+ self.addService(FakeStorageServer(self.nodeid, self.nickname))
+
+ def get_long_nodeid(self):
+ return "v0-nodeid"
+ def get_long_tubid(self):
+ return "tubid"
def startService(self):
return service.MultiService.startService(self)
MUTABLE_SIZELIMIT = FakeMutableFileNode.MUTABLE_SIZELIMIT
-class WebMixin(object):
+class WebMixin(testutil.TimezoneMixin):
def setUp(self):
+ self.setTimezone('UTC-13:00')
self.s = FakeClient()
self.s.startService()
self.staticdir = self.mktemp()
self.clock = Clock()
+ self.fakeTime = 86460 # 1d 0h 1m 0s
self.ws = webish.WebishServer(self.s, "0", staticdir=self.staticdir,
- clock=self.clock)
+ clock=self.clock, now_fn=lambda:self.fakeTime)
self.ws.setServiceParent(self.s)
self.webish_port = self.ws.getPortnum()
self.webish_url = self.ws.getURL()
self._htmlname_raw = self._htmlname_unicode.encode('utf-8')
self._htmlname_urlencoded = urllib.quote(self._htmlname_raw, '')
self._htmlname_escaped = escapeToXML(self._htmlname_raw)
- self._htmlname_escaped_attr = html.escape(self._htmlname_raw)
- self._htmlname_escaped_double = escapeToXML(html.escape(self._htmlname_raw))
+ self._htmlname_escaped_attr = cgi.escape(self._htmlname_raw, quote=True)
+ self._htmlname_escaped_double = escapeToXML(cgi.escape(self._htmlname_raw, quote=True))
self.HTMLNAME_CONTENTS, n, self._htmlname_txt_uri = self.makefile(0)
foo.set_uri(self._htmlname_unicode, self._htmlname_txt_uri, self._htmlname_txt_uri)
res.trap(expected_failure)
if substring:
self.failUnlessIn(substring, str(res),
- "'%s' not in '%s' for test '%s'" % \
- (substring, str(res), which))
+ "'%s' not in '%s' (response is '%s') for test '%s'" % \
+ (substring, str(res),
+ getattr(res.value, "response", ""),
+ which))
if response_substring:
self.failUnlessIn(response_substring, res.value.response,
"'%s' not in '%s' for test '%s'" % \
self.fail("%s was supposed to Error(302), not get '%s'" %
(which, res))
-
class Web(WebMixin, WebErrorMixin, testutil.StallMixin, testutil.ReallyEqualMixin, unittest.TestCase):
def test_create(self):
pass
def test_welcome(self):
d = self.GET("/")
def _check(res):
- self.failUnlessIn('Welcome to Tahoe-LAFS', res)
+ self.failUnlessIn('<title>Tahoe-LAFS - Welcome</title>', res)
self.failUnlessIn(FAVICON_MARKUP, res)
- self.failUnlessIn('href="https://tahoe-lafs.org/"', res)
+ self.failUnlessIn('<a href="status">Recent and Active Operations</a>', res)
+ self.failUnlessIn('<a href="statistics">Operational Statistics</a>', res)
+ self.failUnless(re.search('<input (type="hidden" |name="t" |value="report-incident" ){3}/>',res), res)
+ self.failUnlessIn('Page rendered at', res)
+ self.failUnlessIn('Tahoe-LAFS code imported from:', res)
+ res_u = res.decode('utf-8')
+ self.failUnlessIn(u'<td>fake_nickname \u263A</td>', res_u)
+ self.failUnlessIn(u'<div class="nickname">other_nickname \u263B</div>', res_u)
+ self.failUnlessIn(u'Connected to <span>1</span>\n of <span>2</span> known storage servers', res_u)
+ self.failUnless(re.search(u'<div class="status-indicator"><img (src="img/connected-yes.png" |alt="Connected" ){2}/></div>\n <a( class="timestamp"| title="1970-01-01 13:00:10"){2}>1d\u00A00h\u00A00m\u00A050s</a>', res_u), repr(res_u))
+ self.failUnless(re.search(u'<div class="status-indicator"><img (src="img/connected-no.png" |alt="Disconnected" ){2}/></div>\n <a( class="timestamp"| title="1970-01-01 13:00:25"){2}>1d\u00A00h\u00A00m\u00A035s</a>', res_u), repr(res_u))
+ self.failUnless(re.search(u'<td class="service-last-received-data"><a( class="timestamp"| title="1970-01-01 13:00:30"){2}>1d\u00A00h\u00A00m\u00A030s</a></td>', res_u), repr(res_u))
+ self.failUnless(re.search(u'<td class="service-last-received-data"><a( class="timestamp"| title="1970-01-01 13:00:35"){2}>1d\u00A00h\u00A00m\u00A025s</a></td>', res_u), repr(res_u))
+ self.failUnlessIn(u'\u00A9 <a href="https://tahoe-lafs.org/">Tahoe-LAFS Software Foundation', res_u)
+ self.failUnlessIn('<td><h3>Available</h3></td>', res)
+ self.failUnlessIn('123.5kB', res)
self.s.basedir = 'web/test_welcome'
fileutil.make_dirs("web/test_welcome")
d.addCallback(_check)
return d
+ def test_introducer_status(self):
+ class MockIntroducerClient(object):
+ def __init__(self, connected):
+ self.connected = connected
+ def connected_to_introducer(self):
+ return self.connected
+
+ d = defer.succeed(None)
+
+ # introducer not connected, unguessable furl
+ def _set_introducer_not_connected_unguessable(ign):
+ self.s.introducer_furl = "pb://someIntroducer/secret"
+ self.s.introducer_client = MockIntroducerClient(False)
+ return self.GET("/")
+ d.addCallback(_set_introducer_not_connected_unguessable)
+ def _check_introducer_not_connected_unguessable(res):
+ html = res.replace('\n', ' ')
+ self.failUnlessIn('<div class="furl">pb://someIntroducer/[censored]</div>', html)
+ self.failIfIn('pb://someIntroducer/secret', html)
+ self.failUnless(re.search('<img (alt="Disconnected" |src="img/connected-no.png" ){2}/>', html), res)
+ d.addCallback(_check_introducer_not_connected_unguessable)
+
+ # introducer connected, unguessable furl
+ def _set_introducer_connected_unguessable(ign):
+ self.s.introducer_furl = "pb://someIntroducer/secret"
+ self.s.introducer_client = MockIntroducerClient(True)
+ return self.GET("/")
+ d.addCallback(_set_introducer_connected_unguessable)
+ def _check_introducer_connected_unguessable(res):
+ html = res.replace('\n', ' ')
+ self.failUnlessIn('<div class="furl">pb://someIntroducer/[censored]</div>', html)
+ self.failIfIn('pb://someIntroducer/secret', html)
+ self.failUnless(re.search('<img (src="img/connected-yes.png" |alt="Connected" ){2}/>', html), res)
+ d.addCallback(_check_introducer_connected_unguessable)
+
+ # introducer connected, guessable furl
+ def _set_introducer_connected_guessable(ign):
+ self.s.introducer_furl = "pb://someIntroducer/introducer"
+ self.s.introducer_client = MockIntroducerClient(True)
+ return self.GET("/")
+ d.addCallback(_set_introducer_connected_guessable)
+ def _check_introducer_connected_guessable(res):
+ html = res.replace('\n', ' ')
+ self.failUnlessIn('<div class="furl">pb://someIntroducer/introducer</div>', html)
+ self.failUnless(re.search('<img (src="img/connected-yes.png" |alt="Connected" ){2}/>', html), res)
+ d.addCallback(_check_introducer_connected_guessable)
+ return d
+
+ def test_helper_status(self):
+ d = defer.succeed(None)
+
+ # set helper furl to None
+ def _set_no_helper(ign):
+ self.s.uploader.helper_furl = None
+ return self.GET("/")
+ d.addCallback(_set_no_helper)
+ def _check_no_helper(res):
+ html = res.replace('\n', ' ')
+ self.failUnless(re.search('<img (src="img/connected-not-configured.png" |alt="Not Configured" ){2}/>', html), res)
+ d.addCallback(_check_no_helper)
+
+ # enable helper, not connected
+ def _set_helper_not_connected(ign):
+ self.s.uploader.helper_furl = "pb://someHelper/secret"
+ self.s.uploader.helper_connected = False
+ return self.GET("/")
+ d.addCallback(_set_helper_not_connected)
+ def _check_helper_not_connected(res):
+ html = res.replace('\n', ' ')
+ self.failUnlessIn('<div class="furl">pb://someHelper/[censored]</div>', html)
+ self.failIfIn('pb://someHelper/secret', html)
+ self.failUnless(re.search('<img (src="img/connected-no.png" |alt="Disconnected" ){2}/>', html), res)
+ d.addCallback(_check_helper_not_connected)
+
+ # enable helper, connected
+ def _set_helper_connected(ign):
+ self.s.uploader.helper_furl = "pb://someHelper/secret"
+ self.s.uploader.helper_connected = True
+ return self.GET("/")
+ d.addCallback(_set_helper_connected)
+ def _check_helper_connected(res):
+ html = res.replace('\n', ' ')
+ self.failUnlessIn('<div class="furl">pb://someHelper/[censored]</div>', html)
+ self.failIfIn('pb://someHelper/secret', html)
+ self.failUnless(re.search('<img (src="img/connected-yes.png" |alt="Connected" ){2}/>', html), res)
+ d.addCallback(_check_helper_connected)
+ return d
+
+ def test_storage(self):
+ d = self.GET("/storage")
+ def _check(res):
+ self.failUnlessIn('Storage Server Status', res)
+ self.failUnlessIn(FAVICON_MARKUP, res)
+ res_u = res.decode('utf-8')
+ self.failUnlessIn(u'<li>Server Nickname: <span class="nickname mine">fake_nickname \u263A</span></li>', res_u)
+ d.addCallback(_check)
+ return d
+
def test_status(self):
h = self.s.get_history()
dl_num = h.list_all_download_statuses()[0].get_counter()
ret_num = h.list_all_retrieve_statuses()[0].get_counter()
d = self.GET("/status", followRedirect=True)
def _check(res):
- self.failUnlessIn('Upload and Download Status', res)
+ self.failUnlessIn('Recent and Active Operations', res)
self.failUnlessIn('"down-%d"' % dl_num, res)
self.failUnlessIn('"up-%d"' % ul_num, res)
self.failUnlessIn('"mapupdate-%d"' % mu_num, res)
def _check_upload_and_mkdir_forms(self, html):
# We should have a form to create a file, with radio buttons that allow
# the user to toggle whether it is a CHK/LIT (default), SDMF, or MDMF file.
- self.failUnlessIn('name="t" value="upload"', html)
- self.failUnlessIn('input checked="checked" type="radio" id="upload-chk" value="chk" name="format"', html)
- self.failUnlessIn('input type="radio" id="upload-sdmf" value="sdmf" name="format"', html)
- self.failUnlessIn('input type="radio" id="upload-mdmf" value="mdmf" name="format"', html)
+ self.failUnless(re.search('<input (name="t" |value="upload" |type="hidden" ){3}/>', html), html)
+ self.failUnless(re.search('<input [^/]*id="upload-chk"', html), html)
+ self.failUnless(re.search('<input [^/]*id="upload-sdmf"', html), html)
+ self.failUnless(re.search('<input [^/]*id="upload-mdmf"', html), html)
# We should also have the ability to create a mutable directory, with
# radio buttons that allow the user to toggle whether it is an SDMF (default)
# or MDMF directory.
- self.failUnlessIn('name="t" value="mkdir"', html)
- self.failUnlessIn('input checked="checked" type="radio" id="mkdir-sdmf" value="sdmf" name="format"', html)
- self.failUnlessIn('input type="radio" id="mkdir-mdmf" value="mdmf" name="format"', html)
+ self.failUnless(re.search('<input (name="t" |value="mkdir" |type="hidden" ){3}/>', html), html)
+ self.failUnless(re.search('<input [^/]*id="mkdir-sdmf"', html), html)
+ self.failUnless(re.search('<input [^/]*id="mkdir-mdmf"', html), html)
self.failUnlessIn(FAVICON_MARKUP, html)
def test_GET_DIRECTORY_html(self):
d = self.GET(self.public_url + "/foo", followRedirect=True)
def _check(html):
- self.failUnlessIn('<div class="toolbar-item"><a href="../../..">Return to Welcome page</a></div>', html)
+ self.failUnlessIn('<li class="toolbar-item"><a href="../../..">Return to Welcome page</a></li>', html)
self._check_upload_and_mkdir_forms(html)
self.failUnlessIn("quux", html)
d.addCallback(_check)
r'\s+<td align="right">%d</td>' % len(self.BAR_CONTENTS),
])
self.failUnless(re.search(get_bar, res), res)
- for label in ['unlink', 'rename/move']:
+ for label in ['unlink', 'rename/relink']:
for line in res.split("\n"):
# find the line that contains the relevant button for bar.txt
if ("form action" in line and
d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty/"))
def _check4(res):
self.failUnlessIn("directory is empty", res)
- MKDIR_BUTTON_RE=re.compile('<input type="hidden" name="t" value="mkdir" />.*<legend class="freeform-form-label">Create a new directory in this directory</legend>.*<input type="submit" value="Create" />', re.I)
+ MKDIR_BUTTON_RE=re.compile('<input (type="hidden" |name="t" |value="mkdir" ){3}/>.*<legend class="freeform-form-label">Create a new directory in this directory</legend>.*<input (type="submit" |class="btn" |value="Create" ){3}/>', re.I)
self.failUnless(MKDIR_BUTTON_RE.search(res), res)
d.addCallback(_check4)
d = self.GET("/")
def _after_get_welcome_page(res):
MKDIR_BUTTON_RE = re.compile(
- '<form action="([^"]*)" method="post".*?'
- '<input type="hidden" name="t" value="([^"]*)" />'
- '<input type="hidden" name="([^"]*)" value="([^"]*)" />'
- '<input type="submit" value="Create a directory" />',
- re.I)
- mo = MKDIR_BUTTON_RE.search(res)
+ '<form(?: action="([^"]*)"| method="post"| enctype="multipart/form-data"){3}>.*'
+ '<input (?:type="hidden" |name="t" |value="([^"]*?)" ){3}/>[ ]*'
+ '<input (?:type="hidden" |name="([^"]*)" |value="([^"]*)" ){3}/>[ ]*'
+ '<input (type="submit" |class="btn" |value="Create a directory[^"]*" ){3}/>')
+ html = res.replace('\n', ' ')
+ mo = MKDIR_BUTTON_RE.search(html)
+ self.failUnless(mo, html)
formaction = mo.group(1)
formt = mo.group(2)
formaname = mo.group(3)
d.addCallback(self.failUnlessIsEmptyJSON)
return d
+ def test_POST_rename_file_no_replace_same_link(self):
+ d = self.POST(self.public_url + "/foo", t="rename",
+ replace="false", from_name="bar.txt", to_name="bar.txt")
+ d.addCallback(lambda res: self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_rename_file_replace_only_files(self):
+ d = self.POST(self.public_url + "/foo", t="rename",
+ replace="only-files", from_name="bar.txt",
+ to_name="baz.txt")
+ d.addCallback(lambda res: self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/baz.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/baz.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_rename_file_replace_only_files_conflict(self):
+ d = self.shouldFail2(error.Error, "POST_relink_file_replace_only_files_conflict",
+ "409 Conflict",
+ "There was already a child by that name, and you asked me to not replace it.",
+ self.POST, self.public_url + "/foo", t="relink",
+ replace="only-files", from_name="bar.txt",
+ to_name="empty")
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
def failUnlessIsEmptyJSON(self, res):
data = simplejson.loads(res)
self.failUnlessEqual(data[0], "dirnode", data)
self.failUnlessReallyEqual(len(data[1]["children"]), 0)
- def test_POST_rename_file_slash_fail(self):
+ def test_POST_rename_file_to_slash_fail(self):
d = self.POST(self.public_url + "/foo", t="rename",
from_name="bar.txt", to_name='kirk/spock.txt')
d.addBoth(self.shouldFail, error.Error,
- "test_POST_rename_file_slash_fail",
+ "test_POST_rename_file_to_slash_fail",
"400 Bad Request",
"to_name= may not contain a slash",
)
self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
return d
+ def test_POST_rename_file_from_slash_fail(self):
+ d = self.POST(self.public_url + "/foo", t="rename",
+ from_name="sub/bar.txt", to_name='spock.txt')
+ d.addBoth(self.shouldFail, error.Error,
+ "test_POST_rename_from_file_slash_fail",
+ "400 Bad Request",
+ "from_name= may not contain a slash",
+ )
+ d.addCallback(lambda res:
+ self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
+ return d
+
def test_POST_rename_dir(self):
d = self.POST(self.public_url, t="rename",
from_name="foo", to_name='plunk')
d.addCallback(self.failUnlessIsFooJSON)
return d
- def test_POST_move_file(self):
- d = self.POST(self.public_url + "/foo", t="move",
- from_name="bar.txt", to_dir="sub")
+ def test_POST_relink_file(self):
+ d = self.POST(self.public_url + "/foo", t="relink",
+ from_name="bar.txt",
+ to_dir=self.public_root.get_uri() + "/foo/sub")
d.addCallback(lambda res:
self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
d.addCallback(lambda res:
d.addCallback(self.failUnlessIsBarJSON)
return d
- def test_POST_move_file_new_name(self):
- d = self.POST(self.public_url + "/foo", t="move",
- from_name="bar.txt", to_name="wibble.txt", to_dir="sub")
+ def test_POST_relink_file_new_name(self):
+ d = self.POST(self.public_url + "/foo", t="relink",
+ from_name="bar.txt",
+ to_name="wibble.txt", to_dir=self.public_root.get_uri() + "/foo/sub")
d.addCallback(lambda res:
self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
d.addCallback(lambda res:
d.addCallback(self.failUnlessIsBarJSON)
return d
- def test_POST_move_file_replace(self):
- d = self.POST(self.public_url + "/foo", t="move",
- from_name="bar.txt", to_name="baz.txt", to_dir="sub")
+ def test_POST_relink_file_replace(self):
+ d = self.POST(self.public_url + "/foo", t="relink",
+ from_name="bar.txt",
+ to_name="baz.txt", to_dir=self.public_root.get_uri() + "/foo/sub")
d.addCallback(lambda res:
self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/baz.txt"))
d.addCallback(self.failUnlessIsBarJSON)
return d
- def test_POST_move_file_no_replace(self):
- d = self.shouldFail2(error.Error, "POST_move_file_no_replace",
+ def test_POST_relink_file_no_replace(self):
+ d = self.shouldFail2(error.Error, "POST_relink_file_no_replace",
"409 Conflict",
"There was already a child by that name, and you asked me to not replace it",
- self.POST, self.public_url + "/foo", t="move",
+ self.POST, self.public_url + "/foo", t="relink",
replace="false", from_name="bar.txt",
- to_name="baz.txt", to_dir="sub")
+ to_name="baz.txt", to_dir=self.public_root.get_uri() + "/foo/sub")
d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
d.addCallback(self.failUnlessIsSubBazDotTxt)
return d
- def test_POST_move_file_slash_fail(self):
+ def test_POST_relink_file_no_replace_explicitly_same_link(self):
+ d = self.POST(self.public_url + "/foo", t="relink",
+ replace="false", from_name="bar.txt",
+ to_name="bar.txt", to_dir=self.public_root.get_uri() + "/foo")
+ d.addCallback(lambda res: self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_relink_file_replace_only_files(self):
+ d = self.POST(self.public_url + "/foo", t="relink",
+ replace="only-files", from_name="bar.txt",
+ to_name="baz.txt", to_dir=self.public_root.get_uri() + "/foo/sub")
+ d.addCallback(lambda res:
+ self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/baz.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/baz.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_relink_file_replace_only_files_conflict(self):
+ d = self.shouldFail2(error.Error, "POST_relink_file_replace_only_files_conflict",
+ "409 Conflict",
+ "There was already a child by that name, and you asked me to not replace it.",
+ self.POST, self.public_url + "/foo", t="relink",
+ replace="only-files", from_name="bar.txt",
+ to_name="sub", to_dir=self.public_root.get_uri() + "/foo")
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_relink_file_to_slash_fail(self):
d = self.shouldFail2(error.Error, "test_POST_rename_file_slash_fail",
"400 Bad Request",
"to_name= may not contain a slash",
- self.POST, self.public_url + "/foo", t="move",
+ self.POST, self.public_url + "/foo", t="relink",
from_name="bar.txt",
- to_name="slash/fail.txt", to_dir="sub")
+ to_name="slash/fail.txt", to_dir=self.public_root.get_uri() + "/foo/sub")
d.addCallback(lambda res:
self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
d.addCallback(lambda res:
"400 Bad Request",
"from_name= may not contain a slash",
self.POST, self.public_url + "/foo",
- t="move",
+ t="relink",
from_name="nope/bar.txt",
- to_name="fail.txt", to_dir="sub"))
+ to_name="fail.txt",
+ to_dir=self.public_root.get_uri() + "/foo/sub"))
return d
- def test_POST_move_file_no_target(self):
- d = self.shouldFail2(error.Error, "POST_move_file_no_target",
- "400 Bad Request",
- "move requires from_name and to_dir",
- self.POST, self.public_url + "/foo", t="move",
- from_name="bar.txt", to_name="baz.txt")
+ def test_POST_relink_file_explicitly_same_link(self):
+ d = self.POST(self.public_url + "/foo", t="relink",
+ from_name="bar.txt",
+ to_name="bar.txt", to_dir=self.public_root.get_uri() + "/foo")
+ d.addCallback(lambda res: self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_relink_file_implicitly_same_link(self):
+ d = self.POST(self.public_url + "/foo", t="relink",
+ from_name="bar.txt")
+ d.addCallback(lambda res: self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_relink_file_same_dir(self):
+ d = self.POST(self.public_url + "/foo", t="relink",
+ from_name="bar.txt",
+ to_name="baz.txt", to_dir=self.public_root.get_uri() + "/foo")
+ d.addCallback(lambda res: self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
+ d.addCallback(lambda res: self.failUnlessNodeHasChild(self._sub_node, u"baz.txt"))
d.addCallback(lambda res: self.GET(self.public_url + "/foo/baz.txt"))
- d.addCallback(self.failUnlessIsBazDotTxt)
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/baz.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
return d
- def test_POST_move_file_bad_target_type(self):
- d = self.shouldFail2(error.Error, "test_POST_move_file_bad_target_type",
- "400 Bad Request", "invalid target_type parameter",
+ def test_POST_relink_file_bad_replace(self):
+ d = self.shouldFail2(error.Error, "test_POST_relink_file_bad_replace",
+ "400 Bad Request", "invalid replace= argument: 'boogabooga'",
self.POST,
- self.public_url + "/foo", t="move",
- target_type="*D", from_name="bar.txt",
- to_dir="sub")
+ self.public_url + "/foo", t="relink",
+ replace="boogabooga", from_name="bar.txt",
+ to_dir=self.public_root.get_uri() + "/foo/sub")
return d
- def test_POST_move_file_multi_level(self):
+ def test_POST_relink_file_multi_level(self):
d = self.POST(self.public_url + "/foo/sub/level2?t=mkdir", "")
- d.addCallback(lambda res: self.POST(self.public_url + "/foo", t="move",
- from_name="bar.txt", to_dir="sub/level2"))
+ d.addCallback(lambda res: self.POST(self.public_url + "/foo", t="relink",
+ from_name="bar.txt", to_dir=self.public_root.get_uri() + "/foo/sub/level2"))
d.addCallback(lambda res: self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
d.addCallback(lambda res: self.failIfNodeHasChild(self._sub_node, u"bar.txt"))
d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/level2/bar.txt"))
d.addCallback(self.failUnlessIsBarJSON)
return d
- def test_POST_move_file_to_uri(self):
- d = self.POST(self.public_url + "/foo", t="move", target_type="uri",
+ def test_POST_relink_file_to_uri(self):
+ d = self.POST(self.public_url + "/foo", t="relink", target_type="uri",
from_name="bar.txt", to_dir=self._sub_uri)
d.addCallback(lambda res:
self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
d.addCallback(self.failUnlessIsBarJSON)
return d
- def test_POST_move_file_to_nonexist_dir(self):
- d = self.shouldFail2(error.Error, "POST_move_file_to_nonexist_dir",
+ def test_POST_relink_file_to_nonexistent_dir(self):
+ d = self.shouldFail2(error.Error, "POST_relink_file_to_nonexistent_dir",
"404 Not Found", "No such child: nopechucktesta",
- self.POST, self.public_url + "/foo", t="move",
- from_name="bar.txt", to_dir="nopechucktesta")
+ self.POST, self.public_url + "/foo", t="relink",
+ from_name="bar.txt",
+ to_dir=self.public_root.get_uri() + "/nopechucktesta")
return d
- def test_POST_move_file_into_file(self):
- d = self.shouldFail2(error.Error, "POST_move_file_into_file",
+ def test_POST_relink_file_into_file(self):
+ d = self.shouldFail2(error.Error, "POST_relink_file_into_file",
"400 Bad Request", "to_dir is not a directory",
- self.POST, self.public_url + "/foo", t="move",
- from_name="bar.txt", to_dir="baz.txt")
+ self.POST, self.public_url + "/foo", t="relink",
+ from_name="bar.txt",
+ to_dir=self.public_root.get_uri() + "/foo/baz.txt")
d.addCallback(lambda res: self.GET(self.public_url + "/foo/baz.txt"))
d.addCallback(self.failUnlessIsBazDotTxt)
d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarJSON)
return d
- def test_POST_move_file_to_bad_uri(self):
- d = self.shouldFail2(error.Error, "POST_move_file_to_bad_uri",
+ def test_POST_relink_file_to_bad_uri(self):
+ d = self.shouldFail2(error.Error, "POST_relink_file_to_bad_uri",
"400 Bad Request", "to_dir is not a directory",
- self.POST, self.public_url + "/foo", t="move",
- from_name="bar.txt", target_type="uri",
+ self.POST, self.public_url + "/foo", t="relink",
+ from_name="bar.txt",
to_dir="URI:DIR2:mn5jlyjnrjeuydyswlzyui72i:rmneifcj6k6sycjljjhj3f6majsq2zqffydnnul5hfa4j577arma")
d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
d.addCallback(self.failUnlessIsBarDotTxt)
d.addCallback(self.failUnlessIsBarJSON)
return d
- def test_POST_move_dir(self):
- d = self.POST(self.public_url + "/foo", t="move",
- from_name="bar.txt", to_dir="empty")
+ def test_POST_relink_dir(self):
+ d = self.POST(self.public_url + "/foo", t="relink",
+ from_name="bar.txt",
+ to_dir=self.public_root.get_uri() + "/foo/empty")
d.addCallback(lambda res: self.POST(self.public_url + "/foo",
- t="move", from_name="empty", to_dir="sub"))
+ t="relink", from_name="empty",
+ to_dir=self.public_root.get_uri() + "/foo/sub"))
d.addCallback(lambda res:
self.failIfNodeHasChild(self._foo_node, u"empty"))
d.addCallback(lambda res:
d = self.GET(self.public_url + "/foo?t=rename-form&name=bar.txt",
followRedirect=True)
def _check(res):
- self.failUnlessIn('name="when_done" value="."', res)
- self.failUnless(re.search(r'name="from_name" value="bar\.txt"', res))
+ self.failUnless(re.search('<input (name="when_done" |value="." |type="hidden" ){3}/>', res), res)
+ self.failUnless(re.search(r'<input (readonly="true" |type="text" |name="from_name" |value="bar\.txt" ){4}/>', res), res)
self.failUnlessIn(FAVICON_MARKUP, res)
d.addCallback(_check)
return d
d = self.POST("/report_incident", details="eek")
def _done(res):
self.failIfIn("<html>", res)
- self.failUnlessIn("Thank you for your report!", res)
+ self.failUnlessIn("An incident report has been saved", res)
d.addCallback(_done)
return d
def test_welcome(self):
basedir = "web.IntroducerWeb.test_welcome"
os.mkdir(basedir)
- fileutil.write(os.path.join(basedir, "tahoe.cfg"), "[node]\nweb.port = tcp:0\n")
+ cfg = "\n".join(["[node]",
+ "tub.location = 127.0.0.1:1",
+ "web.port = tcp:0",
+ ]) + "\n"
+ fileutil.write(os.path.join(basedir, "tahoe.cfg"), cfg)
self.node = IntroducerNode(basedir)
self.ws = self.node.getServiceNamed("webish")
def _check(res):
self.failUnlessIn('Welcome to the Tahoe-LAFS Introducer', res)
self.failUnlessIn(FAVICON_MARKUP, res)
+ self.failUnlessIn('Page rendered at', res)
+ self.failUnlessIn('Tahoe-LAFS code imported from:', res)
d.addCallback(_check)
return d
self.failUnlessReallyEqual(common.parse_replace_arg("false"), False)
self.failUnlessReallyEqual(common.parse_replace_arg("only-files"),
"only-files")
- self.shouldFail(AssertionError, "test_parse_replace_arg", "",
- common.parse_replace_arg, "only_fles")
+ self.failUnlessRaises(common.WebError, common.parse_replace_arg, "only_fles")
def test_abbreviate_time(self):
self.failUnlessReallyEqual(common.abbreviate_time(None), "")
r = simplejson.loads(res)
self.failUnlessEqual(r["summary"], "Healthy")
self.failUnless(r["results"]["healthy"])
- self.failIf(r["results"]["needs-rebalancing"])
+ self.failIfIn("needs-rebalancing", r["results"])
self.failUnless(r["results"]["recoverable"])
d.addCallback(_got_json_good)
self.failUnlessEqual(r["summary"],
"Not Healthy: 9 shares (enc 3-of-10)")
self.failIf(r["results"]["healthy"])
- self.failIf(r["results"]["needs-rebalancing"])
self.failUnless(r["results"]["recoverable"])
+ self.failIfIn("needs-rebalancing", r["results"])
d.addCallback(_got_json_sick)
d.addCallback(self.CHECK, "dead", "t=check")
self.failUnlessEqual(r["summary"],
"Not Healthy: 1 shares (enc 3-of-10)")
self.failIf(r["results"]["healthy"])
- self.failIf(r["results"]["needs-rebalancing"])
self.failIf(r["results"]["recoverable"])
+ self.failIfIn("needs-rebalancing", r["results"])
d.addCallback(_got_json_dead)
d.addCallback(self.CHECK, "corrupt", "t=check&verify=true")
self.failUnlessIn("Unhealthy: 9 shares (enc 3-of-10)", r["summary"])
self.failIf(r["results"]["healthy"])
self.failUnless(r["results"]["recoverable"])
+ self.failIfIn("needs-rebalancing", r["results"])
+ self.failUnlessReallyEqual(r["results"]["count-happiness"], 9)
self.failUnlessReallyEqual(r["results"]["count-shares-good"], 9)
self.failUnlessReallyEqual(r["results"]["count-corrupt-shares"], 1)
d.addCallback(_got_json_corrupt)
self.failUnlessEqual(u0["type"], "directory")
self.failUnlessReallyEqual(to_str(u0["cap"]), self.rootnode.get_uri())
u0cr = u0["check-results"]
+ self.failUnlessReallyEqual(u0cr["results"]["count-happiness"], 10)
self.failUnlessReallyEqual(u0cr["results"]["count-shares-good"], 10)
ugood = [u for u in units
if u["type"] == "file" and u["path"] == [u"good"]][0]
self.failUnlessReallyEqual(to_str(ugood["cap"]), self.uris["good"])
ugoodcr = ugood["check-results"]
+ self.failUnlessReallyEqual(ugoodcr["results"]["count-happiness"], 10)
self.failUnlessReallyEqual(ugoodcr["results"]["count-shares-good"], 10)
stats = units[-1]
self.failUnlessEqual(last_unit["path"], ["subdir"])
r = last_unit["check-results"]["results"]
self.failUnlessReallyEqual(r["count-recoverable-versions"], 0)
+ self.failUnlessReallyEqual(r["count-happiness"], 1)
self.failUnlessReallyEqual(r["count-shares-good"], 1)
self.failUnlessReallyEqual(r["recoverable"], False)
d.addCallback(_check_broken_deepcheck)
self.failUnlessReallyEqual(to_str(u0["cap"]), self.rootnode.get_uri())
u0crr = u0["check-and-repair-results"]
self.failUnlessReallyEqual(u0crr["repair-attempted"], False)
+ self.failUnlessReallyEqual(u0crr["pre-repair-results"]["results"]["count-happiness"], 10)
self.failUnlessReallyEqual(u0crr["pre-repair-results"]["results"]["count-shares-good"], 10)
ugood = [u for u in units
self.failUnlessEqual(to_str(ugood["cap"]), self.uris["good"])
ugoodcrr = ugood["check-and-repair-results"]
self.failUnlessReallyEqual(ugoodcrr["repair-attempted"], False)
+ self.failUnlessReallyEqual(ugoodcrr["pre-repair-results"]["results"]["count-happiness"], 10)
self.failUnlessReallyEqual(ugoodcrr["pre-repair-results"]["results"]["count-shares-good"], 10)
usick = [u for u in units
usickcrr = usick["check-and-repair-results"]
self.failUnlessReallyEqual(usickcrr["repair-attempted"], True)
self.failUnlessReallyEqual(usickcrr["repair-successful"], True)
+ self.failUnlessReallyEqual(usickcrr["pre-repair-results"]["results"]["count-happiness"], 9)
self.failUnlessReallyEqual(usickcrr["pre-repair-results"]["results"]["count-shares-good"], 9)
+ self.failUnlessReallyEqual(usickcrr["post-repair-results"]["results"]["count-happiness"], 10)
self.failUnlessReallyEqual(usickcrr["post-repair-results"]["results"]["count-shares-good"], 10)
stats = units[-1]
self.basedir = "web/Grid/exceptions"
self.set_up_grid(num_clients=1, num_servers=2)
c0 = self.g.clients[0]
- c0.DEFAULT_ENCODING_PARAMETERS['happy'] = 2
+ c0.encoding_params['happy'] = 2
self.fileurls = {}
DATA = "data" * 100
d = c0.create_dirnode()
d.addCallback(lambda ignored: self.GET(self.fileurls["dir-0share"]))
def _check_0shares_dir_html(body):
- self.failUnlessIn("<html>", body)
+ self.failUnlessIn(DIR_HTML_TAG, body)
# we should see the regular page, but without the child table or
# the dirops forms
body = " ".join(body.strip().split())
# and some-shares like we did for immutable files (since there
# are different sorts of advice to offer in each case). For now,
# they present the same way.
- self.failUnlessIn("<html>", body)
+ self.failUnlessIn(DIR_HTML_TAG, body)
body = " ".join(body.strip().split())
self.failUnlessIn('href="?t=info">More info on this directory',
body)
d.addCallback(_stash_dir)
d.addCallback(lambda ign: self.GET(self.dir_url, followRedirect=True))
def _check_dir_html(body):
- self.failUnlessIn("<html>", body)
+ self.failUnlessIn(DIR_HTML_TAG, body)
self.failUnlessIn("blacklisted.txt</a>", body)
d.addCallback(_check_dir_html)
d.addCallback(lambda ign: self.GET(self.url))
# We should still be able to list the parent directory, in HTML...
d.addCallback(lambda ign: self.GET(self.dir_url, followRedirect=True))
def _check_dir_html2(body):
- self.failUnlessIn("<html>", body)
+ self.failUnlessIn(DIR_HTML_TAG, body)
self.failUnlessIn("blacklisted.txt</strike>", body)
d.addCallback(_check_dir_html2)
self.child_url = "uri/"+dn.get_readonly_uri()+"/child"
d.addCallback(_get_dircap)
d.addCallback(lambda ign: self.GET(self.dir_url_base, followRedirect=True))
- d.addCallback(lambda body: self.failUnlessIn("<html>", body))
+ d.addCallback(lambda body: self.failUnlessIn(DIR_HTML_TAG, body))
d.addCallback(lambda ign: self.GET(self.dir_url_json1))
d.addCallback(lambda res: simplejson.loads(res)) # just check it decodes
d.addCallback(lambda ign: self.GET(self.dir_url_json2))