self._sub_uri = sub_uri
foo.set_uri(u"sub", sub_uri, sub_uri)
sub = self.s.create_node_from_uri(sub_uri)
+ self._sub_node = sub
_ign, n, blocking_uri = self.makefile(1)
foo.set_uri(u"blockingfile", blocking_uri, blocking_uri)
# still think of it as an umlaut
foo.set_uri(unicode_filename, self._bar_txt_uri, self._bar_txt_uri)
- _ign, n, baz_file = self.makefile(2)
+ self.SUBBAZ_CONTENTS, n, baz_file = self.makefile(2)
self._baz_file_uri = baz_file
sub.set_uri(u"baz.txt", baz_file, baz_file)
def failUnlessIsBazDotTxt(self, res):
self.failUnlessReallyEqual(res, self.BAZ_CONTENTS, res)
+ def failUnlessIsSubBazDotTxt(self, res):
+ self.failUnlessReallyEqual(res, self.SUBBAZ_CONTENTS, res)
+
def failUnlessIsBarJSON(self, res):
data = simplejson.loads(res)
self.failUnless(isinstance(data, list))
r'\s+<td align="right">%d</td>' % len(self.BAR_CONTENTS),
])
self.failUnless(re.search(get_bar, res), res)
- for label in ['unlink', 'rename']:
+ for label in ['unlink', 'rename', 'move']:
for line in res.split("\n"):
# find the line that contains the relevant button for bar.txt
if ("form action" in line and
d.addCallback(self.failUnlessIsFooJSON)
return d
+ def test_POST_move_file(self):
+ """"""
+ d = self.POST(self.public_url + "/foo", t="move",
+ from_name="bar.txt", to_dir="sub")
+ d.addCallback(lambda res:
+ self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
+ d.addCallback(lambda res:
+ self.failUnlessNodeHasChild(self._sub_node, u"bar.txt"))
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/bar.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/bar.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_move_file_new_name(self):
+ d = self.POST(self.public_url + "/foo", t="move",
+ from_name="bar.txt", to_name="wibble.txt", to_dir="sub")
+ d.addCallback(lambda res:
+ self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
+ d.addCallback(lambda res:
+ self.failIfNodeHasChild(self._sub_node, u"bar.txt"))
+ d.addCallback(lambda res:
+ self.failUnlessNodeHasChild(self._sub_node, u"wibble.txt"))
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/wibble.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/wibble.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_move_file_replace(self):
+ d = self.POST(self.public_url + "/foo", t="move",
+ from_name="bar.txt", to_name="baz.txt", to_dir="sub")
+ d.addCallback(lambda res:
+ self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/baz.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/baz.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_move_file_no_replace(self):
+ d = self.POST(self.public_url + "/foo", t="move", replace="false",
+ from_name="bar.txt", to_name="baz.txt", to_dir="sub")
+ d.addBoth(self.shouldFail, error.Error,
+ "POST_move_file_no_replace",
+ "409 Conflict",
+ "There was already a child by that name, and you asked me "
+ "to not replace it")
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/baz.txt"))
+ d.addCallback(self.failUnlessIsSubBazDotTxt)
+ return d
+
+ def test_POST_move_file_slash_fail(self):
+ d = self.POST(self.public_url + "/foo", t="move",
+ from_name="bar.txt", to_name="slash/fail.txt", to_dir="sub")
+ d.addBoth(self.shouldFail, error.Error,
+ "test_POST_rename_file_slash_fail",
+ "400 Bad Request",
+ "to_name= may not contain a slash",
+ )
+ d.addCallback(lambda res:
+ self.failUnlessNodeHasChild(self._foo_node, u"bar.txt"))
+ d.addCallback(lambda res:
+ self.failIfNodeHasChild(self._sub_node, u"slash/fail.txt"))
+ return d
+
+ def test_POST_move_file_no_target(self):
+ d = self.POST(self.public_url + "/foo", t="move",
+ from_name="bar.txt", to_name="baz.txt")
+ d.addBoth(self.shouldFail, error.Error,
+ "POST_move_file_no_target",
+ "400 Bad Request",
+ "move requires from_name and to_dir")
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/baz.txt"))
+ d.addCallback(self.failUnlessIsBazDotTxt)
+ return d
+
+ def test_POST_move_file_multi_level(self):
+ d = self.POST(self.public_url + "/foo/sub/level2?t=mkdir", "")
+ d.addCallback(lambda res: self.POST(self.public_url + "/foo", t="move",
+ from_name="bar.txt", to_dir="sub/level2"))
+ d.addCallback(lambda res: self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
+ d.addCallback(lambda res: self.failIfNodeHasChild(self._sub_node, u"bar.txt"))
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/level2/bar.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/level2/bar.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_move_file_to_uri(self):
+ d = self.POST(self.public_url + "/foo", t="move",
+ from_name="bar.txt", to_dir=self._sub_uri)
+ d.addCallback(lambda res:
+ self.failIfNodeHasChild(self._foo_node, u"bar.txt"))
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/bar.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/sub/bar.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_move_file_to_nonexist_dir(self):
+ d = self.POST(self.public_url + "/foo", t="move",
+ from_name="bar.txt", to_dir="notchucktesta")
+ d.addBoth(self.shouldFail, error.Error,
+ "POST_move_file_to_nonexist_dir",
+ "404 Not Found",
+ "No such child: notchucktesta")
+ return d
+
+ def test_POST_move_file_into_file(self):
+ d = self.POST(self.public_url + "/foo", t="move",
+ from_name="bar.txt", to_dir="baz.txt")
+ d.addBoth(self.shouldFail, error.Error,
+ "POST_move_file_into_file",
+ "410 Gone",
+ "to_dir is not a usable directory")
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/baz.txt"))
+ d.addCallback(self.failUnlessIsBazDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
+ def test_POST_move_file_to_bad_uri(self):
+ d = self.POST(self.public_url + "/foo", t="move", from_name="bar.txt",
+ to_dir="URI:DIR2:mn5jlyjnrjeuydyswlzyui72i:rmneifcj6k6sycjljjhj3f6majsq2zqffydnnul5hfa4j577arma")
+ d.addBoth(self.shouldFail, error.Error,
+ "POST_move_file_to_bad_uri",
+ "410 Gone",
+ "to_dir is not a usable directory")
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
+ d.addCallback(self.failUnlessIsBarDotTxt)
+ d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt?t=json"))
+ d.addCallback(self.failUnlessIsBarJSON)
+ return d
+
def shouldRedirect(self, res, target=None, statuscode=None, which=""):
""" If target is not None then the redirection has to go to target. If
statuscode is not None then the redirection has to be accomplished with
d.addCallback(_check)
return d
+ def test_GET_move_form(self):
+ d = self.GET(self.public_url + "/foo?t=move-form&name=bar.txt",
+ followRedirect=True)
+ def _check(res):
+ self.failUnless('name="when_done" value="."' in res, res)
+ self.failUnless(re.search(r'name="from_name" value="bar\.txt"', res))
+ d.addCallback(_check)
+ return d
+
def log(self, res, msg):
#print "MSG: %s RES: %s" % (msg, res)
log.msg(msg)
from foolscap.api import fireEventually
from allmydata.util import base32, time_format
-from allmydata.uri import from_string_dirnode
+from allmydata.uri import from_string_dirnode, is_writeable_directory_uri
from allmydata.interfaces import IDirectoryNode, IFileNode, IFilesystemNode, \
IImmutableFileNode, IMutableFileNode, ExistingChildError, \
NoSuchChildError, EmptyPathnameComponentError, SDMF_VERSION, MDMF_VERSION
return DirectoryReadonlyURI(ctx, self.node)
if t == 'rename-form':
return RenameForm(self.node)
+ if t == 'move-form':
+ return MoveForm(self.node)
raise WebError("GET directory: bad t=%s" % t)
d = self._POST_unlink(req)
elif t == "rename":
d = self._POST_rename(req)
+ elif t == "move":
+ d = self._POST_move(req)
elif t == "check":
d = self._POST_check(req)
elif t == "start-deep-check":
d.addCallback(lambda res: "thing renamed")
return d
+ def _POST_move(self, req):
+ charset = get_arg(req, "_charset", "utf-8")
+ from_name = get_arg(req, "from_name")
+ if from_name is not None:
+ from_name = from_name.strip()
+ from_name = from_name.decode(charset)
+ assert isinstance(from_name, unicode)
+ to_name = get_arg(req, "to_name")
+ if to_name is not None:
+ to_name = to_name.strip()
+ to_name = to_name.decode(charset)
+ assert isinstance(to_name, unicode)
+ if not to_name:
+ to_name = from_name
+ to_dir = get_arg(req, "to_dir")
+ if to_dir is not None:
+ to_dir = to_dir.strip()
+ to_dir = to_dir.decode(charset)
+ assert isinstance(to_dir, unicode)
+ if not from_name or not to_dir:
+ raise WebError("move requires from_name and to_dir")
+ replace = boolean_of_arg(get_arg(req, "replace", "true"))
+
+ # allow from_name to contain slashes, so they can fix names that
+ # were accidentally created with them. But disallow them in to_name
+ # (if it's specified), to discourage the practice.
+ if to_name and "/" in to_name:
+ raise WebError("to_name= may not contain a slash", http.BAD_REQUEST)
+
+ d = self.node.has_child(to_dir.split('/')[0])
+ def get_target_node(isname):
+ if isname or not is_writeable_directory_uri(str(to_dir)):
+ return self.node.get_child_at_path(to_dir)
+ else:
+ return self.client.create_node_from_uri(str(to_dir))
+ d.addCallback(get_target_node)
+ def is_target_node_usable(target_node):
+ if not IDirectoryNode.providedBy(target_node):
+ raise WebError("to_dir is not a usable directory", http.GONE)
+ return target_node
+ d.addCallback(is_target_node_usable)
+ d.addCallback(lambda new_parent: self.node.move_child_to(
+ from_name, new_parent, to_name, replace))
+ d.addCallback(lambda res: "thing moved")
+ return d
+
def _maybe_literal(self, res, Results_Class):
if res:
return Results_Class(self.client, res)
if self.node.is_unknown() or self.node.is_readonly():
unlink = "-"
rename = "-"
+ move = "-"
else:
# this creates a button which will cause our _POST_unlink method
# to be invoked, which unlinks the file and then redirects the
T.input(type='submit', value='rename', name="rename"),
]
+ move = T.form(action=here, method="get")[
+ T.input(type='hidden', name='t', value='move-form'),
+ T.input(type='hidden', name='name', value=name),
+ T.input(type='hidden', name='when_done', value="."),
+ T.input(type='submit', value='move', name="move"),
+ ]
+
ctx.fillSlots("unlink", unlink)
ctx.fillSlots("rename", rename)
+ ctx.fillSlots("move", move)
times = []
linkcrtime = metadata.get('tahoe', {}).get("linkcrtime")
ctx.tag.attributes['value'] = name
return ctx.tag
+class MoveForm(rend.Page):
+ addSlash = True
+ docFactory = getxmlfile("move-form.xhtml")
+
+ def render_title(self, ctx, data):
+ return ctx.tag["Directory SI=%s" % abbreviated_dirnode(self.original)]
+
+ def render_header(self, ctx, data):
+ header = ["Move "
+ "from directory SI=%s" % abbreviated_dirnode(self.original),
+ ]
+
+ if self.original.is_readonly():
+ header.append(" (readonly!)")
+ header.append(":")
+ return ctx.tag[header]
+
+ def render_when_done(self, ctx, data):
+ return T.input(type="hidden", name="when_done", value=".")
+
+ def render_get_name(self, ctx, data):
+ req = IRequest(ctx)
+ name = get_arg(req, "name", "")
+ ctx.tag.attributes['value'] = name
+ return ctx.tag
+
class ManifestResults(rend.Page, ReloadMixin):
docFactory = getxmlfile("manifest.xhtml")