1 import re, os.path, urllib
3 from twisted.application import service
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.web import client, error, http
7 from twisted.python import failure, log
8 from allmydata import interfaces, provisioning, uri, webish
9 from allmydata.util import fileutil
10 from allmydata.test.common import NonGridDirectoryNode, FakeCHKFileNode, FakeMutableFileNode, create_chk_filenode
11 from allmydata.interfaces import IURI, INewDirectoryURI, IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, IMutableFileNode
13 # create a fake uploader/downloader, and a couple of fake dirnodes, then
14 # create a webserver that works against them
16 class FakeClient(service.MultiService):
17 nodeid = "fake_nodeid"
18 basedir = "fake_basedir"
19 def get_versions(self):
20 return {'allmydata': "fake",
25 introducer_furl = "None"
26 def connected_to_introducer(self):
28 def get_all_peerids(self):
31 def create_node_from_uri(self, uri):
33 if (INewDirectoryURI.providedBy(u)
34 or IReadonlyNewDirectoryURI.providedBy(u)):
35 return NonGridDirectoryNode(self).init_from_uri(u)
36 if IFileURI.providedBy(u):
37 return FakeCHKFileNode(u, self)
38 assert IMutableFileURI.providedBy(u), u
39 return FakeMutableFileNode(self).init_from_uri(u)
41 def create_empty_dirnode(self, wait_for_numpeers=None):
42 n = NonGridDirectoryNode(self)
43 d = n.create(wait_for_numpeers)
44 d.addCallback(lambda res: n)
47 def create_mutable_file(self, contents="", wait_for_numpeers=None):
48 n = FakeMutableFileNode(self)
49 return n.create(contents)
51 def upload(self, uploadable, wait_for_numpeers=None):
52 d = uploadable.get_size()
53 d.addCallback(lambda size: uploadable.read(size))
56 n = create_chk_filenode(self, data)
58 d.addCallback(_got_data)
62 class WebMixin(object):
66 self.ws = s = webish.WebishServer("0")
67 s.allow_local_access(True)
68 s.setServiceParent(self.s)
69 port = s.listener._port.getHost().port
70 self.webish_url = "http://localhost:%d" % port
72 l = [ self.s.create_empty_dirnode() for x in range(6) ]
73 d = defer.DeferredList(l)
75 self.public_root = res[0][1]
76 assert interfaces.IDirectoryNode.providedBy(self.public_root), res
77 self.public_url = "/uri/" + self.public_root.get_uri()
78 self.private_root = res[1][1]
82 self._foo_uri = foo.get_uri()
83 self._foo_readonly_uri = foo.get_readonly_uri()
84 # NOTE: we ignore the deferred on all set_uri() calls, because we
85 # know the fake nodes do these synchronously
86 self.public_root.set_uri("foo", foo.get_uri())
88 self.BAR_CONTENTS, n, self._bar_txt_uri = self.makefile(0)
89 foo.set_uri("bar.txt", self._bar_txt_uri)
90 foo.set_uri("empty", res[3][1].get_uri())
91 sub_uri = res[4][1].get_uri()
92 foo.set_uri("sub", sub_uri)
93 sub = self.s.create_node_from_uri(sub_uri)
95 _ign, n, blocking_uri = self.makefile(1)
96 foo.set_uri("blockingfile", blocking_uri)
98 _ign, n, baz_file = self.makefile(2)
99 sub.set_uri("baz.txt", baz_file)
101 _ign, n, self._bad_file_uri = self.makefile(3)
102 # this uri should not be downloadable
103 del FakeCHKFileNode.all_contents[self._bad_file_uri]
106 self.public_root.set_uri("reedownlee", rodir.get_readonly_uri())
107 rodir.set_uri("nor", baz_file)
112 # public/foo/blockingfile
115 # public/foo/sub/baz.txt
117 # public/reedownlee/nor
118 self.NEWFILE_CONTENTS = "newfile contents\n"
122 def makefile(self, number):
123 contents = "contents of file %s\n" % number
124 n = create_chk_filenode(self.s, contents)
125 return contents, n, n.get_uri()
128 return self.s.stopService()
130 def failUnlessIsBarDotTxt(self, res):
131 self.failUnlessEqual(res, self.BAR_CONTENTS)
133 def failUnlessIsBarJSON(self, res):
134 data = simplejson.loads(res)
135 self.failUnless(isinstance(data, list))
136 self.failUnlessEqual(data[0], "filenode")
137 self.failUnless(isinstance(data[1], dict))
138 self.failIf("rw_uri" in data[1]) # immutable
139 self.failUnlessEqual(data[1]["ro_uri"], self._bar_txt_uri)
140 self.failUnlessEqual(data[1]["size"], len(self.BAR_CONTENTS))
142 def failUnlessIsFooJSON(self, res):
143 data = simplejson.loads(res)
144 self.failUnless(isinstance(data, list))
145 self.failUnlessEqual(data[0], "dirnode", res)
146 self.failUnless(isinstance(data[1], dict))
147 self.failUnless("rw_uri" in data[1]) # mutable
148 self.failUnlessEqual(data[1]["rw_uri"], self._foo_uri)
149 self.failUnlessEqual(data[1]["ro_uri"], self._foo_readonly_uri)
151 kidnames = sorted(data[1]["children"])
152 self.failUnlessEqual(kidnames,
153 ["bar.txt", "blockingfile", "empty", "sub"])
154 kids = data[1]["children"]
155 self.failUnlessEqual(kids["sub"][0], "dirnode")
156 self.failUnlessEqual(kids["bar.txt"][0], "filenode")
157 self.failUnlessEqual(kids["bar.txt"][1]["size"], len(self.BAR_CONTENTS))
158 self.failUnlessEqual(kids["bar.txt"][1]["ro_uri"], self._bar_txt_uri)
160 def GET(self, urlpath, followRedirect=False):
161 url = self.webish_url + urlpath
162 return client.getPage(url, method="GET", followRedirect=followRedirect)
164 def PUT(self, urlpath, data):
165 url = self.webish_url + urlpath
166 return client.getPage(url, method="PUT", postdata=data)
168 def DELETE(self, urlpath):
169 url = self.webish_url + urlpath
170 return client.getPage(url, method="DELETE")
172 def POST(self, urlpath, followRedirect=False, **fields):
173 url = self.webish_url + urlpath
174 sepbase = "boogabooga"
178 form.append('Content-Disposition: form-data; name="_charset"')
182 for name, value in fields.iteritems():
183 if isinstance(value, tuple):
184 filename, value = value
185 form.append('Content-Disposition: form-data; name="%s"; '
186 'filename="%s"' % (name, filename))
188 form.append('Content-Disposition: form-data; name="%s"' % name)
190 form.append(str(value))
193 body = "\r\n".join(form) + "\r\n"
194 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
196 return client.getPage(url, method="POST", postdata=body,
197 headers=headers, followRedirect=followRedirect)
199 def shouldFail(self, res, expected_failure, which,
200 substring=None, response_substring=None):
201 if isinstance(res, failure.Failure):
202 res.trap(expected_failure)
204 self.failUnless(substring in str(res),
205 "substring '%s' not in '%s'"
206 % (substring, str(res)))
207 if response_substring:
208 self.failUnless(response_substring in res.value.response,
209 "respose substring '%s' not in '%s'"
210 % (response_substring, res.value.response))
212 self.fail("%s was supposed to raise %s, not get '%s'" %
213 (which, expected_failure, res))
215 def should404(self, res, which):
216 if isinstance(res, failure.Failure):
217 res.trap(error.Error)
218 self.failUnlessEqual(res.value.status, "404")
220 self.fail("%s was supposed to Error(404), not get '%s'" %
223 def shouldHTTPError(self, res, which, code=None, substring=None,
224 response_substring=None):
225 if isinstance(res, failure.Failure):
226 res.trap(error.Error)
228 self.failUnlessEqual(res.value.status, str(code))
230 self.failUnless(substring in str(res),
231 "substring '%s' not in '%s'"
232 % (substring, str(res)))
233 if response_substring:
234 self.failUnless(response_substring in res.value.response,
235 "respose substring '%s' not in '%s'"
236 % (response_substring, res.value.response))
238 self.fail("%s was supposed to Error(%s), not get '%s'" %
241 class Web(WebMixin, unittest.TestCase):
242 def test_create(self):
245 def test_welcome(self):
248 self.failUnless('Welcome To AllMyData' in res)
249 self.failUnless('Tahoe' in res)
250 self.failUnless('personal vdrive not available.' in res)
252 self.s.basedir = 'web/test_welcome'
253 fileutil.make_dirs("web/test_welcome")
254 fileutil.make_dirs("web/test_welcome/private")
255 self.ws.create_start_html("private_uri",
256 "web/test_welcome/private/start.html",
257 "web/test_welcome/node.url")
259 d.addCallback(_check)
261 # We shouldn't link to the start.html page since we don't have a
262 # private directory cap.
263 self.failIf('To view your personal private non-shared' in res)
264 self.failIf('from your local filesystem:' in res)
265 self.failIf(os.path.abspath('web/test_welcome/private/start.html') in res)
266 d.addCallback(_check2)
269 def test_provisioning_math(self):
270 self.failUnlessEqual(provisioning.binomial(10, 0), 1)
271 self.failUnlessEqual(provisioning.binomial(10, 1), 10)
272 self.failUnlessEqual(provisioning.binomial(10, 2), 45)
273 self.failUnlessEqual(provisioning.binomial(10, 9), 10)
274 self.failUnlessEqual(provisioning.binomial(10, 10), 1)
276 def test_provisioning(self):
277 d = self.GET("/provisioning/")
279 self.failUnless('Tahoe Provisioning Tool' in res)
280 fields = {'filled': True,
281 "num_users": int(50e3),
282 "files_per_user": 1000,
283 "space_per_user": int(1e9),
284 "sharing_ratio": 1.0,
285 "encoding_parameters": "3-of-10-5",
287 "ownership_mode": "A",
288 "download_rate": 100,
293 return self.POST("/provisioning/", **fields)
295 d.addCallback(_check)
297 self.failUnless('Tahoe Provisioning Tool' in res)
298 self.failUnless("Share space consumed: 167.01TB" in res)
300 fields = {'filled': True,
301 "num_users": int(50e6),
302 "files_per_user": 1000,
303 "space_per_user": int(5e9),
304 "sharing_ratio": 1.0,
305 "encoding_parameters": "25-of-100-50",
306 "num_servers": 30000,
307 "ownership_mode": "E",
308 "drive_failure_model": "U",
310 "download_rate": 1000,
315 return self.POST("/provisioning/", **fields)
316 d.addCallback(_check2)
318 self.failUnless("Share space consumed: huge!" in res)
319 fields = {'filled': True}
320 return self.POST("/provisioning/", **fields)
321 d.addCallback(_check3)
323 self.failUnless("Share space consumed:" in res)
324 d.addCallback(_check4)
327 def test_start_html(self):
328 fileutil.make_dirs("web")
329 fileutil.make_dirs("web/private")
330 startfile = "web/private/start.html"
331 nodeurlfile = "web/node.url"
332 self.ws.create_start_html("private_uri", startfile, nodeurlfile)
334 self.failUnless(os.path.exists(startfile))
335 start_html = open(startfile, "r").read()
336 self.failUnless(self.webish_url in start_html)
337 private_url = self.webish_url + "/uri/private_uri"
338 self.failUnless(private_url in start_html)
340 self.failUnless(os.path.exists(nodeurlfile))
341 nodeurl = open(nodeurlfile, "r").read().strip()
342 self.failUnless(nodeurl.startswith("http://localhost"))
344 def test_GET_FILEURL(self):
345 d = self.GET(self.public_url + "/foo/bar.txt")
346 d.addCallback(self.failUnlessIsBarDotTxt)
349 def test_GET_FILEURL_save(self):
350 d = self.GET(self.public_url + "/foo/bar.txt?save=bar.txt")
351 # TODO: look at the headers, expect a Content-Disposition: attachment
353 d.addCallback(self.failUnlessIsBarDotTxt)
356 def test_GET_FILEURL_download(self):
357 d = self.GET(self.public_url + "/foo/bar.txt?t=download")
358 d.addCallback(self.failUnlessIsBarDotTxt)
361 def test_GET_FILEURL_missing(self):
362 d = self.GET(self.public_url + "/foo/missing")
363 d.addBoth(self.should404, "test_GET_FILEURL_missing")
366 def test_PUT_NEWFILEURL(self):
367 d = self.PUT(self.public_url + "/foo/new.txt", self.NEWFILE_CONTENTS)
368 # TODO: we lose the response code, so we can't check this
369 #self.failUnlessEqual(responsecode, 201)
370 d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "new.txt")
371 d.addCallback(lambda res:
372 self.failUnlessChildContentsAre(self._foo_node, "new.txt",
373 self.NEWFILE_CONTENTS))
376 def test_PUT_NEWFILEURL_replace(self):
377 d = self.PUT(self.public_url + "/foo/bar.txt", self.NEWFILE_CONTENTS)
378 # TODO: we lose the response code, so we can't check this
379 #self.failUnlessEqual(responsecode, 200)
380 d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "bar.txt")
381 d.addCallback(lambda res:
382 self.failUnlessChildContentsAre(self._foo_node, "bar.txt",
383 self.NEWFILE_CONTENTS))
386 def test_PUT_NEWFILEURL_no_replace(self):
387 d = self.PUT(self.public_url + "/foo/bar.txt?replace=false",
388 self.NEWFILE_CONTENTS)
389 d.addBoth(self.shouldFail, error.Error, "PUT_NEWFILEURL_no_replace",
391 "There was already a child by that name, and you asked me "
395 def test_PUT_NEWFILEURL_mkdirs(self):
396 d = self.PUT(self.public_url + "/foo/newdir/new.txt", self.NEWFILE_CONTENTS)
398 d.addCallback(self.failUnlessURIMatchesChild, fn, "newdir/new.txt")
399 d.addCallback(lambda res: self.failIfNodeHasChild(fn, "new.txt"))
400 d.addCallback(lambda res: self.failUnlessNodeHasChild(fn, "newdir"))
401 d.addCallback(lambda res:
402 self.failUnlessChildContentsAre(fn, "newdir/new.txt",
403 self.NEWFILE_CONTENTS))
406 def test_PUT_NEWFILEURL_blocked(self):
407 d = self.PUT(self.public_url + "/foo/blockingfile/new.txt",
408 self.NEWFILE_CONTENTS)
409 d.addBoth(self.shouldFail, error.Error, "PUT_NEWFILEURL_blocked",
411 "cannot create directory because there is a file in the way")
414 def test_DELETE_FILEURL(self):
415 d = self.DELETE(self.public_url + "/foo/bar.txt")
416 d.addCallback(lambda res:
417 self.failIfNodeHasChild(self._foo_node, "bar.txt"))
420 def test_DELETE_FILEURL_missing(self):
421 d = self.DELETE(self.public_url + "/foo/missing")
422 d.addBoth(self.should404, "test_DELETE_FILEURL_missing")
425 def test_DELETE_FILEURL_missing2(self):
426 d = self.DELETE(self.public_url + "/missing/missing")
427 d.addBoth(self.should404, "test_DELETE_FILEURL_missing2")
430 def test_GET_FILEURL_json(self):
431 # twisted.web.http.parse_qs ignores any query args without an '=', so
432 # I can't do "GET /path?json", I have to do "GET /path/t=json"
433 # instead. This may make it tricky to emulate the S3 interface
435 d = self.GET(self.public_url + "/foo/bar.txt?t=json")
436 d.addCallback(self.failUnlessIsBarJSON)
439 def test_GET_FILEURL_json_missing(self):
440 d = self.GET(self.public_url + "/foo/missing?json")
441 d.addBoth(self.should404, "test_GET_FILEURL_json_missing")
444 def disable_local_access(self, res=None):
445 self.ws.allow_local_access(False)
448 def test_GET_FILEURL_localfile(self):
449 localfile = os.path.abspath("web/GET_FILEURL_local file")
450 url = (self.public_url + "/foo/bar.txt?t=download&localfile=%s" %
451 urllib.quote(localfile))
452 fileutil.make_dirs("web")
455 self.failUnless(os.path.exists(localfile))
456 data = open(localfile, "rb").read()
457 self.failUnlessEqual(data, self.BAR_CONTENTS)
461 def test_GET_FILEURL_localfile_disabled(self):
462 localfile = os.path.abspath("web/GET_FILEURL_local file_disabled")
463 url = (self.public_url + "/foo/bar.txt?t=download&localfile=%s" %
464 urllib.quote(localfile))
465 fileutil.make_dirs("web")
466 self.disable_local_access()
468 d.addBoth(self.shouldFail, error.Error, "localfile disabled",
470 "local file access is disabled")
473 def test_GET_FILEURL_localfile_nonlocal(self):
474 # TODO: somehow pretend that we aren't local, and verify that the
475 # server refuses to write to local files, probably by changing the
476 # server's idea of what counts as "local".
477 old_LOCALHOST = webish.LOCALHOST
478 webish.LOCALHOST = "127.0.0.2"
479 localfile = os.path.abspath("web/GET_FILEURL_local file_nonlocal")
480 fileutil.make_dirs("web")
481 d = self.GET(self.public_url + "/foo/bar.txt?t=download&localfile=%s"
482 % urllib.quote(localfile))
483 d.addBoth(self.shouldFail, error.Error, "localfile non-local",
485 "localfile= or localdir= requires a local connection")
487 self.failIf(os.path.exists(localfile))
488 d.addCallback(_check)
490 webish.LOCALHOST = old_LOCALHOST
495 def test_GET_FILEURL_localfile_nonabsolute(self):
496 localfile = "web/nonabsolute/path"
497 fileutil.make_dirs("web/nonabsolute")
498 d = self.GET(self.public_url + "/foo/bar.txt?t=download&localfile=%s"
499 % urllib.quote(localfile))
500 d.addBoth(self.shouldFail, error.Error, "localfile non-absolute",
502 "localfile= or localdir= requires an absolute path")
504 self.failIf(os.path.exists(localfile))
505 d.addCallback(_check)
508 def test_PUT_NEWFILEURL_localfile(self):
509 localfile = os.path.abspath("web/PUT_NEWFILEURL_local file")
510 url = (self.public_url + "/foo/new.txt?t=upload&localfile=%s" %
511 urllib.quote(localfile))
512 fileutil.make_dirs("web")
513 f = open(localfile, "wb")
514 f.write(self.NEWFILE_CONTENTS)
516 d = self.PUT(url, "")
517 d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "new.txt")
518 d.addCallback(lambda res:
519 self.failUnlessChildContentsAre(self._foo_node, "new.txt",
520 self.NEWFILE_CONTENTS))
523 def test_PUT_NEWFILEURL_localfile_disabled(self):
524 localfile = os.path.abspath("web/PUT_NEWFILEURL_local file_disabled")
525 url = (self.public_url + "/foo/new.txt?t=upload&localfile=%s" %
526 urllib.quote(localfile))
527 fileutil.make_dirs("web")
528 f = open(localfile, "wb")
529 f.write(self.NEWFILE_CONTENTS)
531 self.disable_local_access()
532 d = self.PUT(url, "")
533 d.addBoth(self.shouldFail, error.Error, "put localfile disabled",
535 "local file access is disabled")
538 def test_PUT_NEWFILEURL_localfile_mkdirs(self):
539 localfile = os.path.abspath("web/PUT_NEWFILEURL_local file_mkdirs")
540 fileutil.make_dirs("web")
541 f = open(localfile, "wb")
542 f.write(self.NEWFILE_CONTENTS)
544 d = self.PUT(self.public_url + "/foo/newdir/new.txt?t=upload&localfile=%s"
545 % urllib.quote(localfile), "")
546 d.addCallback(self.failUnlessURIMatchesChild,
547 self._foo_node, "newdir/new.txt")
548 d.addCallback(lambda res:
549 self.failIfNodeHasChild(self._foo_node, "new.txt"))
550 d.addCallback(lambda res:
551 self.failUnlessNodeHasChild(self._foo_node, "newdir"))
552 d.addCallback(lambda res:
553 self.failUnlessChildContentsAre(self._foo_node,
555 self.NEWFILE_CONTENTS))
558 def test_GET_FILEURL_uri(self):
559 d = self.GET(self.public_url + "/foo/bar.txt?t=uri")
561 self.failUnlessEqual(res, self._bar_txt_uri)
562 d.addCallback(_check)
563 d.addCallback(lambda res:
564 self.GET(self.public_url + "/foo/bar.txt?t=readonly-uri"))
566 # for now, for files, uris and readonly-uris are the same
567 self.failUnlessEqual(res, self._bar_txt_uri)
568 d.addCallback(_check2)
571 def test_GET_FILEURL_uri_missing(self):
572 d = self.GET(self.public_url + "/foo/missing?t=uri")
573 d.addBoth(self.should404, "test_GET_FILEURL_uri_missing")
576 def test_GET_DIRURL(self):
577 # the addSlash means we get a redirect here
578 d = self.GET(self.public_url + "/foo", followRedirect=True)
580 # the FILE reference points to a URI, but it should end in bar.txt
581 self.failUnless(re.search(r'<td>'
582 '<a href="[^"]+bar.txt">bar.txt</a>'
585 '\s+<td>%d</td>' % len(self.BAR_CONTENTS)
587 self.failUnless(re.search(r'<td><a href="sub">sub</a></td>'
588 '\s+<td>DIR</td>', res))
589 d.addCallback(_check)
591 # look at a directory which is readonly
592 d.addCallback(lambda res:
593 self.GET(self.public_url + "/reedownlee", followRedirect=True))
595 self.failUnless("(readonly)" in res, res)
596 self.failIf("Upload a file" in res, res)
597 d.addCallback(_check2)
599 # and at a directory that contains a readonly directory
600 d.addCallback(lambda res:
601 self.GET(self.public_url, followRedirect=True))
603 self.failUnless(re.search(r'<td><a href="reedownlee">reedownlee</a>'
604 '</td>\s+<td>DIR-RO</td>', res))
605 d.addCallback(_check3)
609 def test_GET_DIRURL_large(self):
610 # Nevow has a problem showing more than about 192 children of a
611 # directory: it uses defer.success() and d.addCallback in a way that
612 # can make the stack grow very quickly. See ticket #237 for details.
613 # To work around this, I think we'll need to put a 'return
614 # defer.fireEventually' in our render_ method. This test is intended
615 # to trigger the bug (and eventually verify that our workaround
616 # actually works), but it isn't yet failing for me.
618 d = self.s.create_empty_dirnode()
620 def _created(dirnode):
621 entries = [ (str(i), self._foo_uri) for i in range(COUNT) ]
622 d2 = dirnode.set_uris(entries)
623 d2.addCallback(lambda res: dirnode)
625 d.addCallback(_created)
628 large_url = "/uri/" + dirnode.get_uri() + "/"
629 return self.GET(large_url)
630 d.addCallback(_check)
633 self.failUnless('<a href="%d">%d</a>' % (COUNT-1, COUNT-1) in res)
634 self.failIf("maximum recursion depth exceeded" in res)
638 def test_GET_DIRURL_json(self):
639 d = self.GET(self.public_url + "/foo?t=json")
640 d.addCallback(self.failUnlessIsFooJSON)
643 def test_GET_DIRURL_manifest(self):
644 d = self.GET(self.public_url + "/foo?t=manifest", followRedirect=True)
646 self.failUnless("Refresh Capabilities" in manifest)
650 def test_GET_DIRURL_uri(self):
651 d = self.GET(self.public_url + "/foo?t=uri")
653 self.failUnlessEqual(res, self._foo_uri)
654 d.addCallback(_check)
657 def test_GET_DIRURL_readonly_uri(self):
658 d = self.GET(self.public_url + "/foo?t=readonly-uri")
660 self.failUnlessEqual(res, self._foo_readonly_uri)
661 d.addCallback(_check)
664 def test_PUT_NEWDIRURL(self):
665 d = self.PUT(self.public_url + "/foo/newdir?t=mkdir", "")
666 d.addCallback(lambda res:
667 self.failUnlessNodeHasChild(self._foo_node, "newdir"))
668 d.addCallback(lambda res: self._foo_node.get("newdir"))
669 d.addCallback(self.failUnlessNodeKeysAre, [])
672 def test_PUT_NEWDIRURL_replace(self):
673 d = self.PUT(self.public_url + "/foo/sub?t=mkdir", "")
674 d.addCallback(lambda res:
675 self.failUnlessNodeHasChild(self._foo_node, "sub"))
676 d.addCallback(lambda res: self._foo_node.get("sub"))
677 d.addCallback(self.failUnlessNodeKeysAre, [])
680 def test_PUT_NEWDIRURL_no_replace(self):
681 d = self.PUT(self.public_url + "/foo/sub?t=mkdir&replace=false", "")
682 d.addBoth(self.shouldFail, error.Error, "PUT_NEWDIRURL_no_replace",
684 "There was already a child by that name, and you asked me "
686 d.addCallback(lambda res:
687 self.failUnlessNodeHasChild(self._foo_node, "sub"))
688 d.addCallback(lambda res: self._foo_node.get("sub"))
689 d.addCallback(self.failUnlessNodeKeysAre, ["baz.txt"])
692 def test_PUT_NEWDIRURL_mkdirs(self):
693 d = self.PUT(self.public_url + "/foo/subdir/newdir?t=mkdir", "")
694 d.addCallback(lambda res:
695 self.failIfNodeHasChild(self._foo_node, "newdir"))
696 d.addCallback(lambda res:
697 self.failUnlessNodeHasChild(self._foo_node, "subdir"))
698 d.addCallback(lambda res:
699 self._foo_node.get_child_at_path("subdir/newdir"))
700 d.addCallback(self.failUnlessNodeKeysAre, [])
703 def test_DELETE_DIRURL(self):
704 d = self.DELETE(self.public_url + "/foo")
705 d.addCallback(lambda res:
706 self.failIfNodeHasChild(self.public_root, "foo"))
709 def test_DELETE_DIRURL_missing(self):
710 d = self.DELETE(self.public_url + "/foo/missing")
711 d.addBoth(self.should404, "test_DELETE_DIRURL_missing")
712 d.addCallback(lambda res:
713 self.failUnlessNodeHasChild(self.public_root, "foo"))
716 def test_DELETE_DIRURL_missing2(self):
717 d = self.DELETE(self.public_url + "/missing")
718 d.addBoth(self.should404, "test_DELETE_DIRURL_missing2")
721 def test_walker(self):
723 def _visitor(path, node, metadata):
724 out.append((path, node))
725 return defer.succeed(None)
726 w = webish.DirnodeWalkerMixin()
727 d = w.walk(self.public_root, _visitor)
729 names = [path for (path,node) in out]
730 self.failUnlessEqual(sorted(names),
733 ('foo','blockingfile'),
736 ('foo','sub','baz.txt'),
738 ('reedownlee', 'nor'),
740 subindex = names.index( ('foo', 'sub') )
741 bazindex = names.index( ('foo', 'sub', 'baz.txt') )
742 self.failUnless(subindex < bazindex)
743 for path,node in out:
744 if path[-1] in ('bar.txt', 'blockingfile', 'baz.txt', 'nor'):
745 self.failUnless(interfaces.IFileNode.providedBy(node))
747 self.failUnless(interfaces.IDirectoryNode.providedBy(node))
748 d.addCallback(_check)
751 def test_GET_DIRURL_localdir(self):
752 localdir = os.path.abspath("web/GET_DIRURL_local dir")
753 fileutil.make_dirs("web")
754 d = self.GET(self.public_url + "/foo?t=download&localdir=%s" %
755 urllib.quote(localdir))
757 barfile = os.path.join(localdir, "bar.txt")
758 self.failUnless(os.path.exists(barfile))
759 data = open(barfile, "rb").read()
760 self.failUnlessEqual(data, self.BAR_CONTENTS)
761 blockingfile = os.path.join(localdir, "blockingfile")
762 self.failUnless(os.path.exists(blockingfile))
763 subdir = os.path.join(localdir, "sub")
764 self.failUnless(os.path.isdir(subdir))
765 d.addCallback(_check)
768 def test_GET_DIRURL_localdir_disabled(self):
769 localdir = os.path.abspath("web/GET_DIRURL_local dir_disabled")
770 fileutil.make_dirs("web")
771 self.disable_local_access()
772 d = self.GET(self.public_url + "/foo?t=download&localdir=%s" %
773 urllib.quote(localdir))
774 d.addBoth(self.shouldFail, error.Error, "localfile disabled",
776 "local file access is disabled")
779 def test_GET_DIRURL_localdir_nonabsolute(self):
780 localdir = "web/nonabsolute/dir path"
781 fileutil.make_dirs("web/nonabsolute")
782 d = self.GET(self.public_url + "/foo?t=download&localdir=%s" %
783 urllib.quote(localdir))
784 d.addBoth(self.shouldFail, error.Error, "localdir non-absolute",
786 "localfile= or localdir= requires an absolute path")
788 self.failIf(os.path.exists(localdir))
789 d.addCallback(_check)
792 def touch(self, localdir, filename):
793 path = os.path.join(localdir, filename)
795 f.write("contents of %s\n" % filename)
800 w = webish.DirnodeWalkerMixin()
801 def visitor(childpath, childnode, metadata):
803 d = w.walk(self.public_root, visitor)
806 def failUnlessNodeKeysAre(self, node, expected_keys):
808 def _check(children):
809 self.failUnlessEqual(sorted(children.keys()), sorted(expected_keys))
810 d.addCallback(_check)
812 def failUnlessNodeHasChild(self, node, name):
814 def _check(children):
815 self.failUnless(name in children)
816 d.addCallback(_check)
818 def failIfNodeHasChild(self, node, name):
820 def _check(children):
821 self.failIf(name in children)
822 d.addCallback(_check)
825 def failUnlessChildContentsAre(self, node, name, expected_contents):
826 d = node.get_child_at_path(name)
827 d.addCallback(lambda node: node.download_to_data())
828 def _check(contents):
829 self.failUnlessEqual(contents, expected_contents)
830 d.addCallback(_check)
833 def failUnlessChildURIIs(self, node, name, expected_uri):
834 d = node.get_child_at_path(name)
836 self.failUnlessEqual(child.get_uri(), expected_uri.strip())
837 d.addCallback(_check)
840 def failUnlessURIMatchesChild(self, got_uri, node, name):
841 d = node.get_child_at_path(name)
843 self.failUnlessEqual(got_uri.strip(), child.get_uri())
844 d.addCallback(_check)
847 def failUnlessCHKURIHasContents(self, got_uri, contents):
848 self.failUnless(FakeCHKFileNode.all_contents[got_uri] == contents)
850 def test_PUT_NEWDIRURL_localdir(self):
851 localdir = os.path.abspath("web/PUT_NEWDIRURL_local dir")
852 # create some files there
853 fileutil.make_dirs(os.path.join(localdir, "one"))
854 fileutil.make_dirs(os.path.join(localdir, "one/sub"))
855 fileutil.make_dirs(os.path.join(localdir, "two"))
856 fileutil.make_dirs(os.path.join(localdir, "three"))
857 self.touch(localdir, "three/foo.txt")
858 self.touch(localdir, "three/bar.txt")
859 self.touch(localdir, "zap.zip")
861 d = self.PUT(self.public_url + "/newdir?t=upload&localdir=%s"
862 % urllib.quote(localdir), "")
863 pr = self.public_root
864 d.addCallback(lambda res: self.failUnlessNodeHasChild(pr, "newdir"))
865 d.addCallback(lambda res: pr.get("newdir"))
866 d.addCallback(self.failUnlessNodeKeysAre,
867 ["one", "two", "three", "zap.zip"])
868 d.addCallback(lambda res: pr.get_child_at_path("newdir/one"))
869 d.addCallback(self.failUnlessNodeKeysAre, ["sub"])
870 d.addCallback(lambda res: pr.get_child_at_path("newdir/three"))
871 d.addCallback(self.failUnlessNodeKeysAre, ["foo.txt", "bar.txt"])
872 d.addCallback(lambda res: pr.get_child_at_path("newdir/three/bar.txt"))
873 d.addCallback(lambda barnode: barnode.download_to_data())
874 d.addCallback(lambda contents:
875 self.failUnlessEqual(contents,
876 "contents of three/bar.txt\n"))
879 def test_PUT_NEWDIRURL_localdir_disabled(self):
880 localdir = os.path.abspath("web/PUT_NEWDIRURL_local dir_disabled")
881 # create some files there
882 fileutil.make_dirs(os.path.join(localdir, "one"))
883 fileutil.make_dirs(os.path.join(localdir, "one/sub"))
884 fileutil.make_dirs(os.path.join(localdir, "two"))
885 fileutil.make_dirs(os.path.join(localdir, "three"))
886 self.touch(localdir, "three/foo.txt")
887 self.touch(localdir, "three/bar.txt")
888 self.touch(localdir, "zap.zip")
890 self.disable_local_access()
891 d = self.PUT(self.public_url + "/newdir?t=upload&localdir=%s"
892 % urllib.quote(localdir), "")
893 d.addBoth(self.shouldFail, error.Error, "localfile disabled",
895 "local file access is disabled")
898 def test_PUT_NEWDIRURL_localdir_mkdirs(self):
899 localdir = os.path.abspath("web/PUT_NEWDIRURL_local dir_mkdirs")
900 # create some files there
901 fileutil.make_dirs(os.path.join(localdir, "one"))
902 fileutil.make_dirs(os.path.join(localdir, "one/sub"))
903 fileutil.make_dirs(os.path.join(localdir, "two"))
904 fileutil.make_dirs(os.path.join(localdir, "three"))
905 self.touch(localdir, "three/foo.txt")
906 self.touch(localdir, "three/bar.txt")
907 self.touch(localdir, "zap.zip")
909 d = self.PUT(self.public_url + "/foo/subdir/newdir?t=upload&localdir=%s"
910 % urllib.quote(localdir),
913 d.addCallback(lambda res: self.failUnlessNodeHasChild(fn, "subdir"))
914 d.addCallback(lambda res: fn.get_child_at_path("subdir/newdir"))
915 d.addCallback(self.failUnlessNodeKeysAre,
916 ["one", "two", "three", "zap.zip"])
917 d.addCallback(lambda res: fn.get_child_at_path("subdir/newdir/one"))
918 d.addCallback(self.failUnlessNodeKeysAre, ["sub"])
919 d.addCallback(lambda res: fn.get_child_at_path("subdir/newdir/three"))
920 d.addCallback(self.failUnlessNodeKeysAre, ["foo.txt", "bar.txt"])
921 d.addCallback(lambda res:
922 fn.get_child_at_path("subdir/newdir/three/bar.txt"))
923 d.addCallback(lambda barnode: barnode.download_to_data())
924 d.addCallback(lambda contents:
925 self.failUnlessEqual(contents,
926 "contents of three/bar.txt\n"))
929 def test_POST_upload(self):
930 d = self.POST(self.public_url + "/foo", t="upload",
931 file=("new.txt", self.NEWFILE_CONTENTS))
933 d.addCallback(self.failUnlessURIMatchesChild, fn, "new.txt")
934 d.addCallback(lambda res:
935 self.failUnlessChildContentsAre(fn, "new.txt",
936 self.NEWFILE_CONTENTS))
939 def test_POST_upload_no_link(self):
940 d = self.POST("/uri/", t="upload",
941 file=("new.txt", self.NEWFILE_CONTENTS))
942 d.addCallback(self.failUnlessCHKURIHasContents, self.NEWFILE_CONTENTS)
945 def test_POST_upload_no_link_whendone(self):
946 d = self.POST("/uri/", t="upload", when_done="/",
947 file=("new.txt", self.NEWFILE_CONTENTS))
948 d.addBoth(self.shouldRedirect, "/")
949 # XXX Test that resulting welcome page has a "most recent
950 # upload", the URI of which points to the file contents that
953 test_POST_upload_no_link_whendone.todo = "Not yet implemented."
955 def test_POST_upload_mutable(self):
956 # this creates a mutable file
957 d = self.POST(self.public_url + "/foo", t="upload", mutable="true",
958 file=("new.txt", self.NEWFILE_CONTENTS))
960 d.addCallback(self.failUnlessURIMatchesChild, fn, "new.txt")
961 d.addCallback(lambda res:
962 self.failUnlessChildContentsAre(fn, "new.txt",
963 self.NEWFILE_CONTENTS))
964 d.addCallback(lambda res: self._foo_node.get("new.txt"))
966 self.failUnless(IMutableFileNode.providedBy(newnode))
967 self.failUnless(newnode.is_mutable())
968 self.failIf(newnode.is_readonly())
969 self._mutable_uri = newnode.get_uri()
972 # now upload it again and make sure that the URI doesn't change
973 NEWER_CONTENTS = self.NEWFILE_CONTENTS + "newer\n"
974 d.addCallback(lambda res:
975 self.POST(self.public_url + "/foo", t="upload",
977 file=("new.txt", NEWER_CONTENTS)))
978 d.addCallback(self.failUnlessURIMatchesChild, fn, "new.txt")
979 d.addCallback(lambda res:
980 self.failUnlessChildContentsAre(fn, "new.txt",
982 d.addCallback(lambda res: self._foo_node.get("new.txt"))
984 self.failUnless(IMutableFileNode.providedBy(newnode))
985 self.failUnless(newnode.is_mutable())
986 self.failIf(newnode.is_readonly())
987 self.failUnlessEqual(self._mutable_uri, newnode.get_uri())
990 # finally list the directory, since mutable files are displayed
993 d.addCallback(lambda res:
994 self.GET(self.public_url + "/foo",
995 followRedirect=True))
996 def _check_page(res):
997 # TODO: assert more about the contents
998 self.failUnless("Overwrite" in res)
999 self.failUnless("Choose new file:" in res)
1001 d.addCallback(_check_page)
1003 # test that clicking on the "overwrite" button works
1004 EVEN_NEWER_CONTENTS = NEWER_CONTENTS + "even newer\n"
1005 OVERWRITE_FORM_RE=re.compile('<form action="(.*)" method="post" enctype="multipart/form-data"><fieldset><input type="hidden" name="t" value="overwrite" /><input type="hidden" name="name" value="(.*)" /><input type="hidden" name="when_done" value="(.*)" /><legend class="freeform-form-label">Overwrite</legend>Choose new file: <input type="file" class="freeform-input-file" name="file" /> <input type="submit" value="Overwrite" /></fieldset></form>', re.I)
1006 def _parse_overwrite_form_and_submit(res):
1007 mo = OVERWRITE_FORM_RE.search(res)
1009 formaction=mo.group(1)
1010 formname=mo.group(2)
1011 formwhendone=mo.group(3)
1013 if formaction == ".":
1014 formaction = self.public_url + "/foo"
1015 return self.POST(formaction, t="overwrite", name=formname, when_done=formwhendone, file=("new.txt", EVEN_NEWER_CONTENTS), followRedirect=False)
1016 d.addCallback(_parse_overwrite_form_and_submit)
1017 d.addBoth(self.shouldRedirect, urllib.quote(self.public_url + "/foo/"))
1018 d.addCallback(lambda res:
1019 self.failUnlessChildContentsAre(fn, "new.txt",
1020 EVEN_NEWER_CONTENTS))
1021 d.addCallback(lambda res: self._foo_node.get("new.txt"))
1023 self.failUnless(IMutableFileNode.providedBy(newnode))
1024 self.failUnless(newnode.is_mutable())
1025 self.failIf(newnode.is_readonly())
1026 self.failUnlessEqual(self._mutable_uri, newnode.get_uri())
1027 d.addCallback(_got3)
1031 def test_POST_upload_replace(self):
1032 d = self.POST(self.public_url + "/foo", t="upload",
1033 file=("bar.txt", self.NEWFILE_CONTENTS))
1035 d.addCallback(self.failUnlessURIMatchesChild, fn, "bar.txt")
1036 d.addCallback(lambda res:
1037 self.failUnlessChildContentsAre(fn, "bar.txt",
1038 self.NEWFILE_CONTENTS))
1041 def test_POST_upload_no_replace_queryarg(self):
1042 d = self.POST(self.public_url + "/foo?replace=false", t="upload",
1043 file=("bar.txt", self.NEWFILE_CONTENTS))
1044 d.addBoth(self.shouldFail, error.Error,
1045 "POST_upload_no_replace_queryarg",
1047 "There was already a child by that name, and you asked me "
1048 "to not replace it")
1049 d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
1050 d.addCallback(self.failUnlessIsBarDotTxt)
1053 def test_POST_upload_no_replace_field(self):
1054 d = self.POST(self.public_url + "/foo", t="upload", replace="false",
1055 file=("bar.txt", self.NEWFILE_CONTENTS))
1056 d.addBoth(self.shouldFail, error.Error, "POST_upload_no_replace_field",
1058 "There was already a child by that name, and you asked me "
1059 "to not replace it")
1060 d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
1061 d.addCallback(self.failUnlessIsBarDotTxt)
1064 def test_POST_upload_whendone(self):
1065 d = self.POST(self.public_url + "/foo", t="upload", when_done="/THERE",
1066 file=("new.txt", self.NEWFILE_CONTENTS))
1067 d.addBoth(self.shouldRedirect, "/THERE")
1069 d.addCallback(lambda res:
1070 self.failUnlessChildContentsAre(fn, "new.txt",
1071 self.NEWFILE_CONTENTS))
1074 def test_POST_upload_named(self):
1076 d = self.POST(self.public_url + "/foo", t="upload",
1077 name="new.txt", file=self.NEWFILE_CONTENTS)
1078 d.addCallback(self.failUnlessURIMatchesChild, fn, "new.txt")
1079 d.addCallback(lambda res:
1080 self.failUnlessChildContentsAre(fn, "new.txt",
1081 self.NEWFILE_CONTENTS))
1084 def test_POST_upload_named_badfilename(self):
1085 d = self.POST(self.public_url + "/foo", t="upload",
1086 name="slashes/are/bad.txt", file=self.NEWFILE_CONTENTS)
1087 d.addBoth(self.shouldFail, error.Error,
1088 "test_POST_upload_named_badfilename",
1090 "name= may not contain a slash",
1092 # make sure that nothing was added
1093 d.addCallback(lambda res:
1094 self.failUnlessNodeKeysAre(self._foo_node,
1095 ["bar.txt", "blockingfile",
1099 def test_POST_mkdir(self): # return value?
1100 d = self.POST(self.public_url + "/foo", t="mkdir", name="newdir")
1101 d.addCallback(lambda res: self._foo_node.get("newdir"))
1102 d.addCallback(self.failUnlessNodeKeysAre, [])
1105 def test_POST_mkdir_no_parentdir_noredirect(self):
1106 d = self.POST("/uri/?t=mkdir")
1107 def _after_mkdir(res):
1108 self.failUnless(uri.is_string_newdirnode_rw(res))
1109 d.addCallback(_after_mkdir)
1112 def test_POST_mkdir_replace(self): # return value?
1113 d = self.POST(self.public_url + "/foo", t="mkdir", name="sub")
1114 d.addCallback(lambda res: self._foo_node.get("sub"))
1115 d.addCallback(self.failUnlessNodeKeysAre, [])
1118 def test_POST_mkdir_no_replace_queryarg(self): # return value?
1119 d = self.POST(self.public_url + "/foo?replace=false", t="mkdir", name="sub")
1120 d.addBoth(self.shouldFail, error.Error,
1121 "POST_mkdir_no_replace_queryarg",
1123 "There was already a child by that name, and you asked me "
1124 "to not replace it")
1125 d.addCallback(lambda res: self._foo_node.get("sub"))
1126 d.addCallback(self.failUnlessNodeKeysAre, ["baz.txt"])
1129 def test_POST_mkdir_no_replace_field(self): # return value?
1130 d = self.POST(self.public_url + "/foo", t="mkdir", name="sub",
1132 d.addBoth(self.shouldFail, error.Error, "POST_mkdir_no_replace_field",
1134 "There was already a child by that name, and you asked me "
1135 "to not replace it")
1136 d.addCallback(lambda res: self._foo_node.get("sub"))
1137 d.addCallback(self.failUnlessNodeKeysAre, ["baz.txt"])
1140 def test_POST_mkdir_whendone_field(self):
1141 d = self.POST(self.public_url + "/foo",
1142 t="mkdir", name="newdir", when_done="/THERE")
1143 d.addBoth(self.shouldRedirect, "/THERE")
1144 d.addCallback(lambda res: self._foo_node.get("newdir"))
1145 d.addCallback(self.failUnlessNodeKeysAre, [])
1148 def test_POST_mkdir_whendone_queryarg(self):
1149 d = self.POST(self.public_url + "/foo?when_done=/THERE",
1150 t="mkdir", name="newdir")
1151 d.addBoth(self.shouldRedirect, "/THERE")
1152 d.addCallback(lambda res: self._foo_node.get("newdir"))
1153 d.addCallback(self.failUnlessNodeKeysAre, [])
1156 def test_POST_put_uri(self):
1157 contents, n, newuri = self.makefile(8)
1158 d = self.POST(self.public_url + "/foo", t="uri", name="new.txt", uri=newuri)
1159 d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "new.txt")
1160 d.addCallback(lambda res:
1161 self.failUnlessChildContentsAre(self._foo_node, "new.txt",
1165 def test_POST_put_uri_replace(self):
1166 contents, n, newuri = self.makefile(8)
1167 d = self.POST(self.public_url + "/foo", t="uri", name="bar.txt", uri=newuri)
1168 d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "bar.txt")
1169 d.addCallback(lambda res:
1170 self.failUnlessChildContentsAre(self._foo_node, "bar.txt",
1174 def test_POST_put_uri_no_replace_queryarg(self):
1175 contents, n, newuri = self.makefile(8)
1176 d = self.POST(self.public_url + "/foo?replace=false", t="uri",
1177 name="bar.txt", uri=newuri)
1178 d.addBoth(self.shouldFail, error.Error,
1179 "POST_put_uri_no_replace_queryarg",
1181 "There was already a child by that name, and you asked me "
1182 "to not replace it")
1183 d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
1184 d.addCallback(self.failUnlessIsBarDotTxt)
1187 def test_POST_put_uri_no_replace_field(self):
1188 contents, n, newuri = self.makefile(8)
1189 d = self.POST(self.public_url + "/foo", t="uri", replace="false",
1190 name="bar.txt", uri=newuri)
1191 d.addBoth(self.shouldFail, error.Error,
1192 "POST_put_uri_no_replace_field",
1194 "There was already a child by that name, and you asked me "
1195 "to not replace it")
1196 d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
1197 d.addCallback(self.failUnlessIsBarDotTxt)
1200 def test_POST_delete(self):
1201 d = self.POST(self.public_url + "/foo", t="delete", name="bar.txt")
1202 d.addCallback(lambda res: self._foo_node.list())
1203 def _check(children):
1204 self.failIf("bar.txt" in children)
1205 d.addCallback(_check)
1208 def test_POST_rename_file(self):
1209 d = self.POST(self.public_url + "/foo", t="rename",
1210 from_name="bar.txt", to_name='wibble.txt')
1211 d.addCallback(lambda res:
1212 self.failIfNodeHasChild(self._foo_node, "bar.txt"))
1213 d.addCallback(lambda res:
1214 self.failUnlessNodeHasChild(self._foo_node, "wibble.txt"))
1215 d.addCallback(lambda res: self.GET(self.public_url + "/foo/wibble.txt"))
1216 d.addCallback(self.failUnlessIsBarDotTxt)
1217 d.addCallback(lambda res: self.GET(self.public_url + "/foo/wibble.txt?t=json"))
1218 d.addCallback(self.failUnlessIsBarJSON)
1221 def test_POST_rename_file_replace(self):
1222 # rename a file and replace a directory with it
1223 d = self.POST(self.public_url + "/foo", t="rename",
1224 from_name="bar.txt", to_name='empty')
1225 d.addCallback(lambda res:
1226 self.failIfNodeHasChild(self._foo_node, "bar.txt"))
1227 d.addCallback(lambda res:
1228 self.failUnlessNodeHasChild(self._foo_node, "empty"))
1229 d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty"))
1230 d.addCallback(self.failUnlessIsBarDotTxt)
1231 d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
1232 d.addCallback(self.failUnlessIsBarJSON)
1235 def test_POST_rename_file_no_replace_queryarg(self):
1236 # rename a file and replace a directory with it
1237 d = self.POST(self.public_url + "/foo?replace=false", t="rename",
1238 from_name="bar.txt", to_name='empty')
1239 d.addBoth(self.shouldFail, error.Error,
1240 "POST_rename_file_no_replace_queryarg",
1242 "There was already a child by that name, and you asked me "
1243 "to not replace it")
1244 d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
1245 d.addCallback(self.failUnlessIsEmptyJSON)
1248 def test_POST_rename_file_no_replace_field(self):
1249 # rename a file and replace a directory with it
1250 d = self.POST(self.public_url + "/foo", t="rename", replace="false",
1251 from_name="bar.txt", to_name='empty')
1252 d.addBoth(self.shouldFail, error.Error,
1253 "POST_rename_file_no_replace_field",
1255 "There was already a child by that name, and you asked me "
1256 "to not replace it")
1257 d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
1258 d.addCallback(self.failUnlessIsEmptyJSON)
1261 def failUnlessIsEmptyJSON(self, res):
1262 data = simplejson.loads(res)
1263 self.failUnlessEqual(data[0], "dirnode", data)
1264 self.failUnlessEqual(len(data[1]["children"]), 0)
1266 def test_POST_rename_file_slash_fail(self):
1267 d = self.POST(self.public_url + "/foo", t="rename",
1268 from_name="bar.txt", to_name='kirk/spock.txt')
1269 d.addBoth(self.shouldFail, error.Error,
1270 "test_POST_rename_file_slash_fail",
1272 "to_name= may not contain a slash",
1274 d.addCallback(lambda res:
1275 self.failUnlessNodeHasChild(self._foo_node, "bar.txt"))
1276 d.addCallback(lambda res: self.POST(self.public_url, t="rename",
1277 from_name="foo/bar.txt", to_name='george.txt'))
1278 d.addBoth(self.shouldFail, error.Error,
1279 "test_POST_rename_file_slash_fail",
1281 "from_name= may not contain a slash",
1283 d.addCallback(lambda res:
1284 self.failUnlessNodeHasChild(self.public_root, "foo"))
1285 d.addCallback(lambda res:
1286 self.failIfNodeHasChild(self.public_root, "george.txt"))
1287 d.addCallback(lambda res:
1288 self.failUnlessNodeHasChild(self._foo_node, "bar.txt"))
1289 d.addCallback(lambda res: self.GET(self.public_url + "/foo?t=json"))
1290 d.addCallback(self.failUnlessIsFooJSON)
1293 def test_POST_rename_dir(self):
1294 d = self.POST(self.public_url, t="rename",
1295 from_name="foo", to_name='plunk')
1296 d.addCallback(lambda res:
1297 self.failIfNodeHasChild(self.public_root, "foo"))
1298 d.addCallback(lambda res:
1299 self.failUnlessNodeHasChild(self.public_root, "plunk"))
1300 d.addCallback(lambda res: self.GET(self.public_url + "/plunk?t=json"))
1301 d.addCallback(self.failUnlessIsFooJSON)
1304 def shouldRedirect(self, res, target):
1305 if not isinstance(res, failure.Failure):
1306 self.fail("we were expecting to get redirected to %s, not get an"
1307 " actual page: %s" % (target, res))
1308 res.trap(error.PageRedirect)
1309 # the PageRedirect does not seem to capture the uri= query arg
1310 # properly, so we can't check for it.
1311 realtarget = self.webish_url + target
1312 self.failUnlessEqual(res.value.location, realtarget)
1314 def test_GET_URI_form(self):
1315 base = "/uri?uri=%s" % self._bar_txt_uri
1316 # this is supposed to give us a redirect to /uri/$URI, plus arguments
1317 targetbase = "/uri/%s" % urllib.quote(self._bar_txt_uri)
1319 d.addBoth(self.shouldRedirect, targetbase)
1320 d.addCallback(lambda res: self.GET(base+"&filename=bar.txt"))
1321 d.addBoth(self.shouldRedirect, targetbase+"?filename=bar.txt")
1322 d.addCallback(lambda res: self.GET(base+"&t=json"))
1323 d.addBoth(self.shouldRedirect, targetbase+"?t=json")
1324 d.addCallback(self.log, "about to get file by uri")
1325 d.addCallback(lambda res: self.GET(base, followRedirect=True))
1326 d.addCallback(self.failUnlessIsBarDotTxt)
1327 d.addCallback(self.log, "got file by uri, about to get dir by uri")
1328 d.addCallback(lambda res: self.GET("/uri?uri=%s&t=json" % self._foo_uri,
1329 followRedirect=True))
1330 d.addCallback(self.failUnlessIsFooJSON)
1331 d.addCallback(self.log, "got dir by uri")
1335 def test_GET_rename_form(self):
1336 d = self.GET(self.public_url + "/foo?t=rename-form&name=bar.txt",
1337 followRedirect=True) # XXX [ ] todo: figure out why '.../foo' doesn't work
1339 self.failUnless(re.search(r'name="when_done" value=".*%s/foo/' % (urllib.quote(self.public_url),), res), (r'name="when_done" value=".*%s/foo/' % (urllib.quote(self.public_url),), res,))
1340 self.failUnless(re.search(r'name="from_name" value="bar\.txt"', res))
1341 d.addCallback(_check)
1344 def log(self, res, msg):
1345 #print "MSG: %s RES: %s" % (msg, res)
1349 def test_GET_URI_URL(self):
1350 base = "/uri/%s" % self._bar_txt_uri
1352 d.addCallback(self.failUnlessIsBarDotTxt)
1353 d.addCallback(lambda res: self.GET(base+"?filename=bar.txt"))
1354 d.addCallback(self.failUnlessIsBarDotTxt)
1355 d.addCallback(lambda res: self.GET(base+"?filename=bar.txt&save=true"))
1356 d.addCallback(self.failUnlessIsBarDotTxt)
1359 def test_GET_URI_URL_dir(self):
1360 base = "/uri/%s?t=json" % self._foo_uri
1362 d.addCallback(self.failUnlessIsFooJSON)
1365 def test_GET_URI_URL_missing(self):
1366 base = "/uri/%s" % self._bad_file_uri
1368 d.addBoth(self.shouldHTTPError, "test_GET_URI_URL_missing",
1369 http.GONE, response_substring="NotEnoughPeersError")
1370 # TODO: how can we exercise both sides of WebDownloadTarget.fail
1371 # here? we must arrange for a download to fail after target.open()
1372 # has been called, and then inspect the response to see that it is
1373 # shorter than we expected.
1376 def test_PUT_NEWFILEURL_uri(self):
1377 contents, n, new_uri = self.makefile(8)
1378 d = self.PUT(self.public_url + "/foo/new.txt?t=uri", new_uri)
1379 d.addCallback(lambda res: self.failUnlessEqual(res.strip(), new_uri))
1380 d.addCallback(lambda res:
1381 self.failUnlessChildContentsAre(self._foo_node, "new.txt",
1385 def test_PUT_NEWFILEURL_uri_replace(self):
1386 contents, n, new_uri = self.makefile(8)
1387 d = self.PUT(self.public_url + "/foo/bar.txt?t=uri", new_uri)
1388 d.addCallback(lambda res: self.failUnlessEqual(res.strip(), new_uri))
1389 d.addCallback(lambda res:
1390 self.failUnlessChildContentsAre(self._foo_node, "bar.txt",
1394 def test_PUT_NEWFILEURL_uri_no_replace(self):
1395 contents, n, new_uri = self.makefile(8)
1396 d = self.PUT(self.public_url + "/foo/bar.txt?t=uri&replace=false", new_uri)
1397 d.addBoth(self.shouldFail, error.Error, "PUT_NEWFILEURL_uri_no_replace",
1399 "There was already a child by that name, and you asked me "
1400 "to not replace it")
1403 def test_PUT_NEWFILE_URI(self):
1404 file_contents = "New file contents here\n"
1405 d = self.PUT("/uri", file_contents)
1407 self.failUnless(uri in FakeCHKFileNode.all_contents)
1408 self.failUnlessEqual(FakeCHKFileNode.all_contents[uri],
1410 return self.GET("/uri/%s" % uri)
1411 d.addCallback(_check)
1413 self.failUnlessEqual(res, file_contents)
1414 d.addCallback(_check2)
1417 def test_PUT_NEWFILE_URI_only_PUT(self):
1418 d = self.PUT("/uri?t=bogus", "")
1419 d.addBoth(self.shouldFail, error.Error,
1420 "PUT_NEWFILE_URI_only_PUT",
1422 "/uri only accepts PUT and PUT?t=mkdir")
1425 def test_PUT_NEWDIR_URI(self):
1426 d = self.PUT("/uri?t=mkdir", "")
1428 n = self.s.create_node_from_uri(uri.strip())
1429 d2 = self.failUnlessNodeKeysAre(n, [])
1430 d2.addCallback(lambda res:
1431 self.GET("/uri/%s?t=json" % uri))
1433 d.addCallback(_check)
1434 d.addCallback(self.failUnlessIsEmptyJSON)
1437 def test_POST_check(self):
1438 d = self.POST(self.public_url + "/foo", t="check", name="bar.txt")
1440 # this returns a string form of the results, which are probably
1441 # None since we're using fake filenodes.
1442 # TODO: verify that the check actually happened, by changing
1443 # FakeCHKFileNode to count how many times .check() has been
1446 d.addCallback(_done)