1 import re, os.path, urllib
3 from twisted.application import service
4 from twisted.trial import unittest
5 from twisted.internet import defer
6 from twisted.web import client, error, http
7 from twisted.python import failure, log
8 from allmydata import webish, interfaces, provisioning
9 from allmydata.util import fileutil
10 from allmydata.test.common import NonGridDirectoryNode, FakeCHKFileNode, FakeMutableFileNode, create_chk_filenode
11 from allmydata.interfaces import IURI, INewDirectoryURI, IReadonlyNewDirectoryURI, IFileURI, IMutableFileURI, IMutableFileNode
13 # create a fake uploader/downloader, and a couple of fake dirnodes, then
14 # create a webserver that works against them
16 class FakeClient(service.MultiService):
17 nodeid = "fake_nodeid"
18 basedir = "fake_basedir"
19 def get_versions(self):
20 return {'allmydata': "fake",
25 introducer_furl = "None"
26 def connected_to_introducer(self):
28 def get_all_peerids(self):
31 def create_node_from_uri(self, uri):
33 if (INewDirectoryURI.providedBy(u)
34 or IReadonlyNewDirectoryURI.providedBy(u)):
35 return NonGridDirectoryNode(self).init_from_uri(u)
36 if IFileURI.providedBy(u):
37 return FakeCHKFileNode(u, self)
38 assert IMutableFileURI.providedBy(u), u
39 return FakeMutableFileNode(self).init_from_uri(u)
41 def create_empty_dirnode(self, wait_for_numpeers=None):
42 n = NonGridDirectoryNode(self)
43 d = n.create(wait_for_numpeers)
44 d.addCallback(lambda res: n)
47 def create_mutable_file(self, contents="", wait_for_numpeers=None):
48 n = FakeMutableFileNode(self)
49 return n.create(contents)
51 def upload(self, uploadable, wait_for_numpeers=None):
52 d = uploadable.get_size()
53 d.addCallback(lambda size: uploadable.read(size))
56 n = create_chk_filenode(self, data)
58 d.addCallback(_got_data)
62 class WebMixin(object):
66 self.ws = s = webish.WebishServer("0")
67 s.allow_local_access(True)
68 s.setServiceParent(self.s)
69 port = s.listener._port.getHost().port
70 self.webish_url = "http://localhost:%d" % port
72 l = [ self.s.create_empty_dirnode() for x in range(6) ]
73 d = defer.DeferredList(l)
75 self.public_root = res[0][1]
76 assert interfaces.IDirectoryNode.providedBy(self.public_root), res
77 self.public_url = "/uri/" + self.public_root.get_uri()
78 self.private_root = res[1][1]
82 self._foo_uri = foo.get_uri()
83 self._foo_readonly_uri = foo.get_readonly_uri()
84 # NOTE: we ignore the deferred on all set_uri() calls, because we
85 # know the fake nodes do these synchronously
86 self.public_root.set_uri("foo", foo.get_uri())
88 self.BAR_CONTENTS, n, self._bar_txt_uri = self.makefile(0)
89 foo.set_uri("bar.txt", self._bar_txt_uri)
90 foo.set_uri("empty", res[3][1].get_uri())
91 sub_uri = res[4][1].get_uri()
92 foo.set_uri("sub", sub_uri)
93 sub = self.s.create_node_from_uri(sub_uri)
95 _ign, n, blocking_uri = self.makefile(1)
96 foo.set_uri("blockingfile", blocking_uri)
98 _ign, n, baz_file = self.makefile(2)
99 sub.set_uri("baz.txt", baz_file)
101 _ign, n, self._bad_file_uri = self.makefile(3)
102 # this uri should not be downloadable
103 del FakeCHKFileNode.all_contents[self._bad_file_uri]
106 self.public_root.set_uri("reedownlee", rodir.get_readonly_uri())
107 rodir.set_uri("nor", baz_file)
112 # public/foo/blockingfile
115 # public/foo/sub/baz.txt
117 # public/reedownlee/nor
118 self.NEWFILE_CONTENTS = "newfile contents\n"
122 def makefile(self, number):
123 contents = "contents of file %s\n" % number
124 n = create_chk_filenode(self.s, contents)
125 return contents, n, n.get_uri()
128 return self.s.stopService()
130 def failUnlessIsBarDotTxt(self, res):
131 self.failUnlessEqual(res, self.BAR_CONTENTS)
133 def failUnlessIsBarJSON(self, res):
134 data = simplejson.loads(res)
135 self.failUnless(isinstance(data, list))
136 self.failUnlessEqual(data[0], "filenode")
137 self.failUnless(isinstance(data[1], dict))
138 self.failIf("rw_uri" in data[1]) # immutable
139 self.failUnlessEqual(data[1]["ro_uri"], self._bar_txt_uri)
140 self.failUnlessEqual(data[1]["size"], len(self.BAR_CONTENTS))
142 def failUnlessIsFooJSON(self, res):
143 data = simplejson.loads(res)
144 self.failUnless(isinstance(data, list))
145 self.failUnlessEqual(data[0], "dirnode", res)
146 self.failUnless(isinstance(data[1], dict))
147 self.failUnless("rw_uri" in data[1]) # mutable
148 self.failUnlessEqual(data[1]["rw_uri"], self._foo_uri)
149 self.failUnlessEqual(data[1]["ro_uri"], self._foo_readonly_uri)
151 kidnames = sorted(data[1]["children"])
152 self.failUnlessEqual(kidnames,
153 ["bar.txt", "blockingfile", "empty", "sub"])
154 kids = data[1]["children"]
155 self.failUnlessEqual(kids["sub"][0], "dirnode")
156 self.failUnlessEqual(kids["bar.txt"][0], "filenode")
157 self.failUnlessEqual(kids["bar.txt"][1]["size"], len(self.BAR_CONTENTS))
158 self.failUnlessEqual(kids["bar.txt"][1]["ro_uri"], self._bar_txt_uri)
160 def GET(self, urlpath, followRedirect=False):
161 url = self.webish_url + urlpath
162 return client.getPage(url, method="GET", followRedirect=followRedirect)
164 def PUT(self, urlpath, data):
165 url = self.webish_url + urlpath
166 return client.getPage(url, method="PUT", postdata=data)
168 def DELETE(self, urlpath):
169 url = self.webish_url + urlpath
170 return client.getPage(url, method="DELETE")
172 def POST(self, urlpath, followRedirect=False, **fields):
173 url = self.webish_url + urlpath
174 sepbase = "boogabooga"
178 form.append('Content-Disposition: form-data; name="_charset"')
182 for name, value in fields.iteritems():
183 if isinstance(value, tuple):
184 filename, value = value
185 form.append('Content-Disposition: form-data; name="%s"; '
186 'filename="%s"' % (name, filename))
188 form.append('Content-Disposition: form-data; name="%s"' % name)
190 form.append(str(value))
193 body = "\r\n".join(form) + "\r\n"
194 headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
196 return client.getPage(url, method="POST", postdata=body,
197 headers=headers, followRedirect=followRedirect)
199 def shouldFail(self, res, expected_failure, which,
200 substring=None, response_substring=None):
201 if isinstance(res, failure.Failure):
202 res.trap(expected_failure)
204 self.failUnless(substring in str(res),
205 "substring '%s' not in '%s'"
206 % (substring, str(res)))
207 if response_substring:
208 self.failUnless(response_substring in res.value.response,
209 "respose substring '%s' not in '%s'"
210 % (response_substring, res.value.response))
212 self.fail("%s was supposed to raise %s, not get '%s'" %
213 (which, expected_failure, res))
215 def should404(self, res, which):
216 if isinstance(res, failure.Failure):
217 res.trap(error.Error)
218 self.failUnlessEqual(res.value.status, "404")
220 self.fail("%s was supposed to Error(404), not get '%s'" %
223 def shouldHTTPError(self, res, which, code=None, substring=None,
224 response_substring=None):
225 if isinstance(res, failure.Failure):
226 res.trap(error.Error)
228 self.failUnlessEqual(res.value.status, str(code))
230 self.failUnless(substring in str(res),
231 "substring '%s' not in '%s'"
232 % (substring, str(res)))
233 if response_substring:
234 self.failUnless(response_substring in res.value.response,
235 "respose substring '%s' not in '%s'"
236 % (response_substring, res.value.response))
238 self.fail("%s was supposed to Error(%s), not get '%s'" %
241 class Web(WebMixin, unittest.TestCase):
242 def test_create(self):
245 def test_welcome(self):
248 self.failUnless('Welcome To AllMyData' in res)
249 self.failUnless('Tahoe' in res)
250 self.failUnless('personal vdrive not available.' in res)
252 self.s.basedir = 'web/test_welcome'
253 fileutil.make_dirs("web/test_welcome")
254 self.ws.create_start_html("private_uri",
255 "web/test_welcome/start.html",
256 "web/test_welcome/node.url")
258 d.addCallback(_check)
260 self.failUnless('To view your personal private non-shared' in res)
261 self.failUnless('from your local filesystem:' in res)
262 self.failUnless(os.path.abspath('web/test_welcome/start.html')
264 d.addCallback(_check2)
267 def test_provisioning_math(self):
268 self.failUnlessEqual(provisioning.binomial(10, 0), 1)
269 self.failUnlessEqual(provisioning.binomial(10, 1), 10)
270 self.failUnlessEqual(provisioning.binomial(10, 2), 45)
271 self.failUnlessEqual(provisioning.binomial(10, 9), 10)
272 self.failUnlessEqual(provisioning.binomial(10, 10), 1)
274 def test_provisioning(self):
275 d = self.GET("/provisioning/")
277 self.failUnless('Tahoe Provisioning Tool' in res)
278 fields = {'filled': True,
279 "num_users": int(50e3),
280 "files_per_user": 1000,
281 "space_per_user": int(1e9),
282 "sharing_ratio": 1.0,
283 "encoding_parameters": "3-of-10-5",
285 "ownership_mode": "A",
286 "download_rate": 100,
291 return self.POST("/provisioning/", **fields)
293 d.addCallback(_check)
295 self.failUnless('Tahoe Provisioning Tool' in res)
296 self.failUnless("Share space consumed: 167.01TB" in res)
298 fields = {'filled': True,
299 "num_users": int(50e6),
300 "files_per_user": 1000,
301 "space_per_user": int(5e9),
302 "sharing_ratio": 1.0,
303 "encoding_parameters": "25-of-100-50",
304 "num_servers": 30000,
305 "ownership_mode": "E",
306 "drive_failure_model": "U",
308 "download_rate": 1000,
313 return self.POST("/provisioning/", **fields)
314 d.addCallback(_check2)
316 self.failUnless("Share space consumed: huge!" in res)
317 fields = {'filled': True}
318 return self.POST("/provisioning/", **fields)
319 d.addCallback(_check3)
321 self.failUnless("Share space consumed:" in res)
322 d.addCallback(_check4)
325 def test_start_html(self):
326 fileutil.make_dirs("web")
327 startfile = "web/start.html"
328 nodeurlfile = "web/node.url"
329 self.ws.create_start_html("private_uri", startfile, nodeurlfile)
331 self.failUnless(os.path.exists(startfile))
332 start_html = open(startfile, "r").read()
333 self.failUnless(self.webish_url in start_html)
334 private_url = self.webish_url + "/uri/private_uri"
335 self.failUnless(private_url in start_html)
337 self.failUnless(os.path.exists(nodeurlfile))
338 nodeurl = open(nodeurlfile, "r").read().strip()
339 self.failUnless(nodeurl.startswith("http://localhost"))
341 def test_GET_FILEURL(self):
342 d = self.GET(self.public_url + "/foo/bar.txt")
343 d.addCallback(self.failUnlessIsBarDotTxt)
346 def test_GET_FILEURL_save(self):
347 d = self.GET(self.public_url + "/foo/bar.txt?save=bar.txt")
348 # TODO: look at the headers, expect a Content-Disposition: attachment
350 d.addCallback(self.failUnlessIsBarDotTxt)
353 def test_GET_FILEURL_download(self):
354 d = self.GET(self.public_url + "/foo/bar.txt?t=download")
355 d.addCallback(self.failUnlessIsBarDotTxt)
358 def test_GET_FILEURL_missing(self):
359 d = self.GET(self.public_url + "/foo/missing")
360 d.addBoth(self.should404, "test_GET_FILEURL_missing")
363 def test_PUT_NEWFILEURL(self):
364 d = self.PUT(self.public_url + "/foo/new.txt", self.NEWFILE_CONTENTS)
365 # TODO: we lose the response code, so we can't check this
366 #self.failUnlessEqual(responsecode, 201)
367 d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "new.txt")
368 d.addCallback(lambda res:
369 self.failUnlessChildContentsAre(self._foo_node, "new.txt",
370 self.NEWFILE_CONTENTS))
373 def test_PUT_NEWFILEURL_replace(self):
374 d = self.PUT(self.public_url + "/foo/bar.txt", self.NEWFILE_CONTENTS)
375 # TODO: we lose the response code, so we can't check this
376 #self.failUnlessEqual(responsecode, 200)
377 d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "bar.txt")
378 d.addCallback(lambda res:
379 self.failUnlessChildContentsAre(self._foo_node, "bar.txt",
380 self.NEWFILE_CONTENTS))
383 def test_PUT_NEWFILEURL_no_replace(self):
384 d = self.PUT(self.public_url + "/foo/bar.txt?replace=false",
385 self.NEWFILE_CONTENTS)
386 d.addBoth(self.shouldFail, error.Error, "PUT_NEWFILEURL_no_replace",
388 "There was already a child by that name, and you asked me "
392 def test_PUT_NEWFILEURL_mkdirs(self):
393 d = self.PUT(self.public_url + "/foo/newdir/new.txt", self.NEWFILE_CONTENTS)
395 d.addCallback(self.failUnlessURIMatchesChild, fn, "newdir/new.txt")
396 d.addCallback(lambda res: self.failIfNodeHasChild(fn, "new.txt"))
397 d.addCallback(lambda res: self.failUnlessNodeHasChild(fn, "newdir"))
398 d.addCallback(lambda res:
399 self.failUnlessChildContentsAre(fn, "newdir/new.txt",
400 self.NEWFILE_CONTENTS))
403 def test_PUT_NEWFILEURL_blocked(self):
404 d = self.PUT(self.public_url + "/foo/blockingfile/new.txt",
405 self.NEWFILE_CONTENTS)
406 d.addBoth(self.shouldFail, error.Error, "PUT_NEWFILEURL_blocked",
408 "cannot create directory because there is a file in the way")
411 def test_DELETE_FILEURL(self):
412 d = self.DELETE(self.public_url + "/foo/bar.txt")
413 d.addCallback(lambda res:
414 self.failIfNodeHasChild(self._foo_node, "bar.txt"))
417 def test_DELETE_FILEURL_missing(self):
418 d = self.DELETE(self.public_url + "/foo/missing")
419 d.addBoth(self.should404, "test_DELETE_FILEURL_missing")
422 def test_DELETE_FILEURL_missing2(self):
423 d = self.DELETE(self.public_url + "/missing/missing")
424 d.addBoth(self.should404, "test_DELETE_FILEURL_missing2")
427 def test_GET_FILEURL_json(self):
428 # twisted.web.http.parse_qs ignores any query args without an '=', so
429 # I can't do "GET /path?json", I have to do "GET /path/t=json"
430 # instead. This may make it tricky to emulate the S3 interface
432 d = self.GET(self.public_url + "/foo/bar.txt?t=json")
433 d.addCallback(self.failUnlessIsBarJSON)
436 def test_GET_FILEURL_json_missing(self):
437 d = self.GET(self.public_url + "/foo/missing?json")
438 d.addBoth(self.should404, "test_GET_FILEURL_json_missing")
441 def disable_local_access(self, res=None):
442 self.ws.allow_local_access(False)
445 def test_GET_FILEURL_localfile(self):
446 localfile = os.path.abspath("web/GET_FILEURL_local file")
447 url = (self.public_url + "/foo/bar.txt?t=download&localfile=%s" %
448 urllib.quote(localfile))
449 fileutil.make_dirs("web")
452 self.failUnless(os.path.exists(localfile))
453 data = open(localfile, "rb").read()
454 self.failUnlessEqual(data, self.BAR_CONTENTS)
458 def test_GET_FILEURL_localfile_disabled(self):
459 localfile = os.path.abspath("web/GET_FILEURL_local file_disabled")
460 url = (self.public_url + "/foo/bar.txt?t=download&localfile=%s" %
461 urllib.quote(localfile))
462 fileutil.make_dirs("web")
463 self.disable_local_access()
465 d.addBoth(self.shouldFail, error.Error, "localfile disabled",
467 "local file access is disabled")
470 def test_GET_FILEURL_localfile_nonlocal(self):
471 # TODO: somehow pretend that we aren't local, and verify that the
472 # server refuses to write to local files, probably by changing the
473 # server's idea of what counts as "local".
474 old_LOCALHOST = webish.LOCALHOST
475 webish.LOCALHOST = "127.0.0.2"
476 localfile = os.path.abspath("web/GET_FILEURL_local file_nonlocal")
477 fileutil.make_dirs("web")
478 d = self.GET(self.public_url + "/foo/bar.txt?t=download&localfile=%s"
479 % urllib.quote(localfile))
480 d.addBoth(self.shouldFail, error.Error, "localfile non-local",
482 "localfile= or localdir= requires a local connection")
484 self.failIf(os.path.exists(localfile))
485 d.addCallback(_check)
487 webish.LOCALHOST = old_LOCALHOST
492 def test_GET_FILEURL_localfile_nonabsolute(self):
493 localfile = "web/nonabsolute/path"
494 fileutil.make_dirs("web/nonabsolute")
495 d = self.GET(self.public_url + "/foo/bar.txt?t=download&localfile=%s"
496 % urllib.quote(localfile))
497 d.addBoth(self.shouldFail, error.Error, "localfile non-absolute",
499 "localfile= or localdir= requires an absolute path")
501 self.failIf(os.path.exists(localfile))
502 d.addCallback(_check)
505 def test_PUT_NEWFILEURL_localfile(self):
506 localfile = os.path.abspath("web/PUT_NEWFILEURL_local file")
507 url = (self.public_url + "/foo/new.txt?t=upload&localfile=%s" %
508 urllib.quote(localfile))
509 fileutil.make_dirs("web")
510 f = open(localfile, "wb")
511 f.write(self.NEWFILE_CONTENTS)
513 d = self.PUT(url, "")
514 d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "new.txt")
515 d.addCallback(lambda res:
516 self.failUnlessChildContentsAre(self._foo_node, "new.txt",
517 self.NEWFILE_CONTENTS))
520 def test_PUT_NEWFILEURL_localfile_disabled(self):
521 localfile = os.path.abspath("web/PUT_NEWFILEURL_local file_disabled")
522 url = (self.public_url + "/foo/new.txt?t=upload&localfile=%s" %
523 urllib.quote(localfile))
524 fileutil.make_dirs("web")
525 f = open(localfile, "wb")
526 f.write(self.NEWFILE_CONTENTS)
528 self.disable_local_access()
529 d = self.PUT(url, "")
530 d.addBoth(self.shouldFail, error.Error, "put localfile disabled",
532 "local file access is disabled")
535 def test_PUT_NEWFILEURL_localfile_mkdirs(self):
536 localfile = os.path.abspath("web/PUT_NEWFILEURL_local file_mkdirs")
537 fileutil.make_dirs("web")
538 f = open(localfile, "wb")
539 f.write(self.NEWFILE_CONTENTS)
541 d = self.PUT(self.public_url + "/foo/newdir/new.txt?t=upload&localfile=%s"
542 % urllib.quote(localfile), "")
543 d.addCallback(self.failUnlessURIMatchesChild,
544 self._foo_node, "newdir/new.txt")
545 d.addCallback(lambda res:
546 self.failIfNodeHasChild(self._foo_node, "new.txt"))
547 d.addCallback(lambda res:
548 self.failUnlessNodeHasChild(self._foo_node, "newdir"))
549 d.addCallback(lambda res:
550 self.failUnlessChildContentsAre(self._foo_node,
552 self.NEWFILE_CONTENTS))
555 def test_GET_FILEURL_uri(self):
556 d = self.GET(self.public_url + "/foo/bar.txt?t=uri")
558 self.failUnlessEqual(res, self._bar_txt_uri)
559 d.addCallback(_check)
560 d.addCallback(lambda res:
561 self.GET(self.public_url + "/foo/bar.txt?t=readonly-uri"))
563 # for now, for files, uris and readonly-uris are the same
564 self.failUnlessEqual(res, self._bar_txt_uri)
565 d.addCallback(_check2)
568 def test_GET_FILEURL_uri_missing(self):
569 d = self.GET(self.public_url + "/foo/missing?t=uri")
570 d.addBoth(self.should404, "test_GET_FILEURL_uri_missing")
573 def test_GET_DIRURL(self):
574 # the addSlash means we get a redirect here
575 d = self.GET(self.public_url + "/foo", followRedirect=True)
577 # the FILE reference points to a URI, but it should end in bar.txt
578 self.failUnless(re.search(r'<td>'
579 '<a href="[^"]+bar.txt">bar.txt</a>'
582 '\s+<td>%d</td>' % len(self.BAR_CONTENTS)
584 self.failUnless(re.search(r'<td><a href="sub">sub</a></td>'
585 '\s+<td>DIR</td>', res))
586 d.addCallback(_check)
588 # look at a directory which is readonly
589 d.addCallback(lambda res:
590 self.GET(self.public_url + "/reedownlee", followRedirect=True))
592 self.failUnless("(readonly)" in res, res)
593 self.failIf("Upload a file" in res, res)
594 d.addCallback(_check2)
596 # and at a directory that contains a readonly directory
597 d.addCallback(lambda res:
598 self.GET(self.public_url, followRedirect=True))
600 self.failUnless(re.search(r'<td><a href="reedownlee">reedownlee</a>'
601 '</td>\s+<td>DIR-RO</td>', res))
602 d.addCallback(_check3)
606 def test_GET_DIRURL_json(self):
607 d = self.GET(self.public_url + "/foo?t=json")
608 d.addCallback(self.failUnlessIsFooJSON)
611 def test_GET_DIRURL_manifest(self):
612 d = self.GET(self.public_url + "/foo?t=manifest", followRedirect=True)
614 self.failUnless("Refresh Capabilities" in manifest)
618 def test_GET_DIRURL_uri(self):
619 d = self.GET(self.public_url + "/foo?t=uri")
621 self.failUnlessEqual(res, self._foo_uri)
622 d.addCallback(_check)
625 def test_GET_DIRURL_readonly_uri(self):
626 d = self.GET(self.public_url + "/foo?t=readonly-uri")
628 self.failUnlessEqual(res, self._foo_readonly_uri)
629 d.addCallback(_check)
632 def test_PUT_NEWDIRURL(self):
633 d = self.PUT(self.public_url + "/foo/newdir?t=mkdir", "")
634 d.addCallback(lambda res:
635 self.failUnlessNodeHasChild(self._foo_node, "newdir"))
636 d.addCallback(lambda res: self._foo_node.get("newdir"))
637 d.addCallback(self.failUnlessNodeKeysAre, [])
640 def test_PUT_NEWDIRURL_replace(self):
641 d = self.PUT(self.public_url + "/foo/sub?t=mkdir", "")
642 d.addCallback(lambda res:
643 self.failUnlessNodeHasChild(self._foo_node, "sub"))
644 d.addCallback(lambda res: self._foo_node.get("sub"))
645 d.addCallback(self.failUnlessNodeKeysAre, [])
648 def test_PUT_NEWDIRURL_no_replace(self):
649 d = self.PUT(self.public_url + "/foo/sub?t=mkdir&replace=false", "")
650 d.addBoth(self.shouldFail, error.Error, "PUT_NEWDIRURL_no_replace",
652 "There was already a child by that name, and you asked me "
654 d.addCallback(lambda res:
655 self.failUnlessNodeHasChild(self._foo_node, "sub"))
656 d.addCallback(lambda res: self._foo_node.get("sub"))
657 d.addCallback(self.failUnlessNodeKeysAre, ["baz.txt"])
660 def test_PUT_NEWDIRURL_mkdirs(self):
661 d = self.PUT(self.public_url + "/foo/subdir/newdir?t=mkdir", "")
662 d.addCallback(lambda res:
663 self.failIfNodeHasChild(self._foo_node, "newdir"))
664 d.addCallback(lambda res:
665 self.failUnlessNodeHasChild(self._foo_node, "subdir"))
666 d.addCallback(lambda res:
667 self._foo_node.get_child_at_path("subdir/newdir"))
668 d.addCallback(self.failUnlessNodeKeysAre, [])
671 def test_DELETE_DIRURL(self):
672 d = self.DELETE(self.public_url + "/foo")
673 d.addCallback(lambda res:
674 self.failIfNodeHasChild(self.public_root, "foo"))
677 def test_DELETE_DIRURL_missing(self):
678 d = self.DELETE(self.public_url + "/foo/missing")
679 d.addBoth(self.should404, "test_DELETE_DIRURL_missing")
680 d.addCallback(lambda res:
681 self.failUnlessNodeHasChild(self.public_root, "foo"))
684 def test_DELETE_DIRURL_missing2(self):
685 d = self.DELETE(self.public_url + "/missing")
686 d.addBoth(self.should404, "test_DELETE_DIRURL_missing2")
689 def test_walker(self):
691 def _visitor(path, node, metadata):
692 out.append((path, node))
693 return defer.succeed(None)
694 w = webish.DirnodeWalkerMixin()
695 d = w.walk(self.public_root, _visitor)
697 names = [path for (path,node) in out]
698 self.failUnlessEqual(sorted(names),
701 ('foo','blockingfile'),
704 ('foo','sub','baz.txt'),
706 ('reedownlee', 'nor'),
708 subindex = names.index( ('foo', 'sub') )
709 bazindex = names.index( ('foo', 'sub', 'baz.txt') )
710 self.failUnless(subindex < bazindex)
711 for path,node in out:
712 if path[-1] in ('bar.txt', 'blockingfile', 'baz.txt', 'nor'):
713 self.failUnless(interfaces.IFileNode.providedBy(node))
715 self.failUnless(interfaces.IDirectoryNode.providedBy(node))
716 d.addCallback(_check)
719 def test_GET_DIRURL_localdir(self):
720 localdir = os.path.abspath("web/GET_DIRURL_local dir")
721 fileutil.make_dirs("web")
722 d = self.GET(self.public_url + "/foo?t=download&localdir=%s" %
723 urllib.quote(localdir))
725 barfile = os.path.join(localdir, "bar.txt")
726 self.failUnless(os.path.exists(barfile))
727 data = open(barfile, "rb").read()
728 self.failUnlessEqual(data, self.BAR_CONTENTS)
729 blockingfile = os.path.join(localdir, "blockingfile")
730 self.failUnless(os.path.exists(blockingfile))
731 subdir = os.path.join(localdir, "sub")
732 self.failUnless(os.path.isdir(subdir))
733 d.addCallback(_check)
736 def test_GET_DIRURL_localdir_disabled(self):
737 localdir = os.path.abspath("web/GET_DIRURL_local dir_disabled")
738 fileutil.make_dirs("web")
739 self.disable_local_access()
740 d = self.GET(self.public_url + "/foo?t=download&localdir=%s" %
741 urllib.quote(localdir))
742 d.addBoth(self.shouldFail, error.Error, "localfile disabled",
744 "local file access is disabled")
747 def test_GET_DIRURL_localdir_nonabsolute(self):
748 localdir = "web/nonabsolute/dir path"
749 fileutil.make_dirs("web/nonabsolute")
750 d = self.GET(self.public_url + "/foo?t=download&localdir=%s" %
751 urllib.quote(localdir))
752 d.addBoth(self.shouldFail, error.Error, "localdir non-absolute",
754 "localfile= or localdir= requires an absolute path")
756 self.failIf(os.path.exists(localdir))
757 d.addCallback(_check)
760 def touch(self, localdir, filename):
761 path = os.path.join(localdir, filename)
763 f.write("contents of %s\n" % filename)
768 w = webish.DirnodeWalkerMixin()
769 def visitor(childpath, childnode, metadata):
771 d = w.walk(self.public_root, visitor)
774 def failUnlessNodeKeysAre(self, node, expected_keys):
776 def _check(children):
777 self.failUnlessEqual(sorted(children.keys()), sorted(expected_keys))
778 d.addCallback(_check)
780 def failUnlessNodeHasChild(self, node, name):
782 def _check(children):
783 self.failUnless(name in children)
784 d.addCallback(_check)
786 def failIfNodeHasChild(self, node, name):
788 def _check(children):
789 self.failIf(name in children)
790 d.addCallback(_check)
793 def failUnlessChildContentsAre(self, node, name, expected_contents):
794 d = node.get_child_at_path(name)
795 d.addCallback(lambda node: node.download_to_data())
796 def _check(contents):
797 self.failUnlessEqual(contents, expected_contents)
798 d.addCallback(_check)
801 def failUnlessChildURIIs(self, node, name, expected_uri):
802 d = node.get_child_at_path(name)
804 self.failUnlessEqual(child.get_uri(), expected_uri.strip())
805 d.addCallback(_check)
808 def failUnlessURIMatchesChild(self, got_uri, node, name):
809 d = node.get_child_at_path(name)
811 self.failUnlessEqual(got_uri.strip(), child.get_uri())
812 d.addCallback(_check)
815 def failUnlessCHKURIHasContents(self, got_uri, contents):
816 self.failUnless(FakeCHKFileNode.all_contents[got_uri] == contents)
818 def test_PUT_NEWDIRURL_localdir(self):
819 localdir = os.path.abspath("web/PUT_NEWDIRURL_local dir")
820 # create some files there
821 fileutil.make_dirs(os.path.join(localdir, "one"))
822 fileutil.make_dirs(os.path.join(localdir, "one/sub"))
823 fileutil.make_dirs(os.path.join(localdir, "two"))
824 fileutil.make_dirs(os.path.join(localdir, "three"))
825 self.touch(localdir, "three/foo.txt")
826 self.touch(localdir, "three/bar.txt")
827 self.touch(localdir, "zap.zip")
829 d = self.PUT(self.public_url + "/newdir?t=upload&localdir=%s"
830 % urllib.quote(localdir), "")
831 pr = self.public_root
832 d.addCallback(lambda res: self.failUnlessNodeHasChild(pr, "newdir"))
833 d.addCallback(lambda res: pr.get("newdir"))
834 d.addCallback(self.failUnlessNodeKeysAre,
835 ["one", "two", "three", "zap.zip"])
836 d.addCallback(lambda res: pr.get_child_at_path("newdir/one"))
837 d.addCallback(self.failUnlessNodeKeysAre, ["sub"])
838 d.addCallback(lambda res: pr.get_child_at_path("newdir/three"))
839 d.addCallback(self.failUnlessNodeKeysAre, ["foo.txt", "bar.txt"])
840 d.addCallback(lambda res: pr.get_child_at_path("newdir/three/bar.txt"))
841 d.addCallback(lambda barnode: barnode.download_to_data())
842 d.addCallback(lambda contents:
843 self.failUnlessEqual(contents,
844 "contents of three/bar.txt\n"))
847 def test_PUT_NEWDIRURL_localdir_disabled(self):
848 localdir = os.path.abspath("web/PUT_NEWDIRURL_local dir_disabled")
849 # create some files there
850 fileutil.make_dirs(os.path.join(localdir, "one"))
851 fileutil.make_dirs(os.path.join(localdir, "one/sub"))
852 fileutil.make_dirs(os.path.join(localdir, "two"))
853 fileutil.make_dirs(os.path.join(localdir, "three"))
854 self.touch(localdir, "three/foo.txt")
855 self.touch(localdir, "three/bar.txt")
856 self.touch(localdir, "zap.zip")
858 self.disable_local_access()
859 d = self.PUT(self.public_url + "/newdir?t=upload&localdir=%s"
860 % urllib.quote(localdir), "")
861 d.addBoth(self.shouldFail, error.Error, "localfile disabled",
863 "local file access is disabled")
866 def test_PUT_NEWDIRURL_localdir_mkdirs(self):
867 localdir = os.path.abspath("web/PUT_NEWDIRURL_local dir_mkdirs")
868 # create some files there
869 fileutil.make_dirs(os.path.join(localdir, "one"))
870 fileutil.make_dirs(os.path.join(localdir, "one/sub"))
871 fileutil.make_dirs(os.path.join(localdir, "two"))
872 fileutil.make_dirs(os.path.join(localdir, "three"))
873 self.touch(localdir, "three/foo.txt")
874 self.touch(localdir, "three/bar.txt")
875 self.touch(localdir, "zap.zip")
877 d = self.PUT(self.public_url + "/foo/subdir/newdir?t=upload&localdir=%s"
878 % urllib.quote(localdir),
881 d.addCallback(lambda res: self.failUnlessNodeHasChild(fn, "subdir"))
882 d.addCallback(lambda res: fn.get_child_at_path("subdir/newdir"))
883 d.addCallback(self.failUnlessNodeKeysAre,
884 ["one", "two", "three", "zap.zip"])
885 d.addCallback(lambda res: fn.get_child_at_path("subdir/newdir/one"))
886 d.addCallback(self.failUnlessNodeKeysAre, ["sub"])
887 d.addCallback(lambda res: fn.get_child_at_path("subdir/newdir/three"))
888 d.addCallback(self.failUnlessNodeKeysAre, ["foo.txt", "bar.txt"])
889 d.addCallback(lambda res:
890 fn.get_child_at_path("subdir/newdir/three/bar.txt"))
891 d.addCallback(lambda barnode: barnode.download_to_data())
892 d.addCallback(lambda contents:
893 self.failUnlessEqual(contents,
894 "contents of three/bar.txt\n"))
897 def test_POST_upload(self):
898 d = self.POST(self.public_url + "/foo", t="upload",
899 file=("new.txt", self.NEWFILE_CONTENTS))
901 d.addCallback(self.failUnlessURIMatchesChild, fn, "new.txt")
902 d.addCallback(lambda res:
903 self.failUnlessChildContentsAre(fn, "new.txt",
904 self.NEWFILE_CONTENTS))
907 def test_POST_upload_no_link(self):
908 d = self.POST("/uri/", t="upload",
909 file=("new.txt", self.NEWFILE_CONTENTS))
910 d.addCallback(self.failUnlessCHKURIHasContents, self.NEWFILE_CONTENTS)
913 def test_POST_upload_no_link_whendone(self):
914 d = self.POST("/uri/", t="upload", when_done="/",
915 file=("new.txt", self.NEWFILE_CONTENTS))
916 d.addBoth(self.shouldRedirect, "/")
917 # XXX Test that resulting welcome page has a "most recent
918 # upload", the URI of which points to the file contents that
921 test_POST_upload_no_link_whendone.todo = "Not yet implemented."
923 def test_POST_upload_mutable(self):
924 # this creates a mutable file
925 d = self.POST(self.public_url + "/foo", t="upload", mutable="true",
926 file=("new.txt", self.NEWFILE_CONTENTS))
928 d.addCallback(self.failUnlessURIMatchesChild, fn, "new.txt")
929 d.addCallback(lambda res:
930 self.failUnlessChildContentsAre(fn, "new.txt",
931 self.NEWFILE_CONTENTS))
932 d.addCallback(lambda res: self._foo_node.get("new.txt"))
934 self.failUnless(IMutableFileNode.providedBy(newnode))
935 self.failUnless(newnode.is_mutable())
936 self.failIf(newnode.is_readonly())
937 self._mutable_uri = newnode.get_uri()
940 # now upload it again and make sure that the URI doesn't change
941 NEWER_CONTENTS = self.NEWFILE_CONTENTS + "newer\n"
942 d.addCallback(lambda res:
943 self.POST(self.public_url + "/foo", t="upload",
945 file=("new.txt", NEWER_CONTENTS)))
946 d.addCallback(self.failUnlessURIMatchesChild, fn, "new.txt")
947 d.addCallback(lambda res:
948 self.failUnlessChildContentsAre(fn, "new.txt",
950 d.addCallback(lambda res: self._foo_node.get("new.txt"))
952 self.failUnless(IMutableFileNode.providedBy(newnode))
953 self.failUnless(newnode.is_mutable())
954 self.failIf(newnode.is_readonly())
955 self.failUnlessEqual(self._mutable_uri, newnode.get_uri())
958 # finally list the directory, since mutable files are displayed
961 d.addCallback(lambda res:
962 self.GET(self.public_url + "/foo",
963 followRedirect=True))
964 def _check_page(res):
965 # TODO: assert more about the contents
966 self.failUnless("Overwrite" in res)
967 self.failUnless("Choose new file:" in res)
969 d.addCallback(_check_page)
971 # test that clicking on the "overwrite" button works
972 EVEN_NEWER_CONTENTS = NEWER_CONTENTS + "even newer\n"
973 OVERWRITE_FORM_RE=re.compile('<form action="(.*)" method="post" enctype="multipart/form-data"><fieldset><input type="hidden" name="t" value="overwrite" /><input type="hidden" name="name" value="(.*)" /><input type="hidden" name="when_done" value="(.*)" /><legend class="freeform-form-label">Overwrite</legend>Choose new file: <input type="file" class="freeform-input-file" name="file" /> <input type="submit" value="Overwrite" /></fieldset></form>', re.I)
974 def _parse_overwrite_form_and_submit(res):
975 mo = OVERWRITE_FORM_RE.search(res)
977 formaction=mo.group(1)
979 formwhendone=mo.group(3)
981 if formaction == ".":
982 formaction = self.public_url + "/foo"
983 return self.POST(formaction, t="overwrite", name=formname, when_done=formwhendone, file=("new.txt", EVEN_NEWER_CONTENTS), followRedirect=False)
984 d.addCallback(_parse_overwrite_form_and_submit)
985 d.addBoth(self.shouldRedirect, urllib.quote(self.public_url + "/foo/"))
986 d.addCallback(lambda res:
987 self.failUnlessChildContentsAre(fn, "new.txt",
988 EVEN_NEWER_CONTENTS))
989 d.addCallback(lambda res: self._foo_node.get("new.txt"))
991 self.failUnless(IMutableFileNode.providedBy(newnode))
992 self.failUnless(newnode.is_mutable())
993 self.failIf(newnode.is_readonly())
994 self.failUnlessEqual(self._mutable_uri, newnode.get_uri())
999 def test_POST_upload_replace(self):
1000 d = self.POST(self.public_url + "/foo", t="upload",
1001 file=("bar.txt", self.NEWFILE_CONTENTS))
1003 d.addCallback(self.failUnlessURIMatchesChild, fn, "bar.txt")
1004 d.addCallback(lambda res:
1005 self.failUnlessChildContentsAre(fn, "bar.txt",
1006 self.NEWFILE_CONTENTS))
1009 def test_POST_upload_no_replace_queryarg(self):
1010 d = self.POST(self.public_url + "/foo?replace=false", t="upload",
1011 file=("bar.txt", self.NEWFILE_CONTENTS))
1012 d.addBoth(self.shouldFail, error.Error,
1013 "POST_upload_no_replace_queryarg",
1015 "There was already a child by that name, and you asked me "
1016 "to not replace it")
1017 d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
1018 d.addCallback(self.failUnlessIsBarDotTxt)
1021 def test_POST_upload_no_replace_field(self):
1022 d = self.POST(self.public_url + "/foo", t="upload", replace="false",
1023 file=("bar.txt", self.NEWFILE_CONTENTS))
1024 d.addBoth(self.shouldFail, error.Error, "POST_upload_no_replace_field",
1026 "There was already a child by that name, and you asked me "
1027 "to not replace it")
1028 d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
1029 d.addCallback(self.failUnlessIsBarDotTxt)
1032 def test_POST_upload_whendone(self):
1033 d = self.POST(self.public_url + "/foo", t="upload", when_done="/THERE",
1034 file=("new.txt", self.NEWFILE_CONTENTS))
1035 d.addBoth(self.shouldRedirect, "/THERE")
1037 d.addCallback(lambda res:
1038 self.failUnlessChildContentsAre(fn, "new.txt",
1039 self.NEWFILE_CONTENTS))
1042 def test_POST_upload_named(self):
1044 d = self.POST(self.public_url + "/foo", t="upload",
1045 name="new.txt", file=self.NEWFILE_CONTENTS)
1046 d.addCallback(self.failUnlessURIMatchesChild, fn, "new.txt")
1047 d.addCallback(lambda res:
1048 self.failUnlessChildContentsAre(fn, "new.txt",
1049 self.NEWFILE_CONTENTS))
1052 def test_POST_upload_named_badfilename(self):
1053 d = self.POST(self.public_url + "/foo", t="upload",
1054 name="slashes/are/bad.txt", file=self.NEWFILE_CONTENTS)
1055 d.addBoth(self.shouldFail, error.Error,
1056 "test_POST_upload_named_badfilename",
1058 "name= may not contain a slash",
1060 # make sure that nothing was added
1061 d.addCallback(lambda res:
1062 self.failUnlessNodeKeysAre(self._foo_node,
1063 ["bar.txt", "blockingfile",
1067 def test_POST_mkdir(self): # return value?
1068 d = self.POST(self.public_url + "/foo", t="mkdir", name="newdir")
1069 d.addCallback(lambda res: self._foo_node.get("newdir"))
1070 d.addCallback(self.failUnlessNodeKeysAre, [])
1073 def test_POST_mkdir_replace(self): # return value?
1074 d = self.POST(self.public_url + "/foo", t="mkdir", name="sub")
1075 d.addCallback(lambda res: self._foo_node.get("sub"))
1076 d.addCallback(self.failUnlessNodeKeysAre, [])
1079 def test_POST_mkdir_no_replace_queryarg(self): # return value?
1080 d = self.POST(self.public_url + "/foo?replace=false", t="mkdir", name="sub")
1081 d.addBoth(self.shouldFail, error.Error,
1082 "POST_mkdir_no_replace_queryarg",
1084 "There was already a child by that name, and you asked me "
1085 "to not replace it")
1086 d.addCallback(lambda res: self._foo_node.get("sub"))
1087 d.addCallback(self.failUnlessNodeKeysAre, ["baz.txt"])
1090 def test_POST_mkdir_no_replace_field(self): # return value?
1091 d = self.POST(self.public_url + "/foo", t="mkdir", name="sub",
1093 d.addBoth(self.shouldFail, error.Error, "POST_mkdir_no_replace_field",
1095 "There was already a child by that name, and you asked me "
1096 "to not replace it")
1097 d.addCallback(lambda res: self._foo_node.get("sub"))
1098 d.addCallback(self.failUnlessNodeKeysAre, ["baz.txt"])
1101 def test_POST_mkdir_whendone_field(self):
1102 d = self.POST(self.public_url + "/foo",
1103 t="mkdir", name="newdir", when_done="/THERE")
1104 d.addBoth(self.shouldRedirect, "/THERE")
1105 d.addCallback(lambda res: self._foo_node.get("newdir"))
1106 d.addCallback(self.failUnlessNodeKeysAre, [])
1109 def test_POST_mkdir_whendone_queryarg(self):
1110 d = self.POST(self.public_url + "/foo?when_done=/THERE",
1111 t="mkdir", name="newdir")
1112 d.addBoth(self.shouldRedirect, "/THERE")
1113 d.addCallback(lambda res: self._foo_node.get("newdir"))
1114 d.addCallback(self.failUnlessNodeKeysAre, [])
1117 def test_POST_put_uri(self):
1118 contents, n, newuri = self.makefile(8)
1119 d = self.POST(self.public_url + "/foo", t="uri", name="new.txt", uri=newuri)
1120 d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "new.txt")
1121 d.addCallback(lambda res:
1122 self.failUnlessChildContentsAre(self._foo_node, "new.txt",
1126 def test_POST_put_uri_replace(self):
1127 contents, n, newuri = self.makefile(8)
1128 d = self.POST(self.public_url + "/foo", t="uri", name="bar.txt", uri=newuri)
1129 d.addCallback(self.failUnlessURIMatchesChild, self._foo_node, "bar.txt")
1130 d.addCallback(lambda res:
1131 self.failUnlessChildContentsAre(self._foo_node, "bar.txt",
1135 def test_POST_put_uri_no_replace_queryarg(self):
1136 contents, n, newuri = self.makefile(8)
1137 d = self.POST(self.public_url + "/foo?replace=false", t="uri",
1138 name="bar.txt", uri=newuri)
1139 d.addBoth(self.shouldFail, error.Error,
1140 "POST_put_uri_no_replace_queryarg",
1142 "There was already a child by that name, and you asked me "
1143 "to not replace it")
1144 d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
1145 d.addCallback(self.failUnlessIsBarDotTxt)
1148 def test_POST_put_uri_no_replace_field(self):
1149 contents, n, newuri = self.makefile(8)
1150 d = self.POST(self.public_url + "/foo", t="uri", replace="false",
1151 name="bar.txt", uri=newuri)
1152 d.addBoth(self.shouldFail, error.Error,
1153 "POST_put_uri_no_replace_field",
1155 "There was already a child by that name, and you asked me "
1156 "to not replace it")
1157 d.addCallback(lambda res: self.GET(self.public_url + "/foo/bar.txt"))
1158 d.addCallback(self.failUnlessIsBarDotTxt)
1161 def test_POST_delete(self):
1162 d = self.POST(self.public_url + "/foo", t="delete", name="bar.txt")
1163 d.addCallback(lambda res: self._foo_node.list())
1164 def _check(children):
1165 self.failIf("bar.txt" in children)
1166 d.addCallback(_check)
1169 def test_POST_rename_file(self):
1170 d = self.POST(self.public_url + "/foo", t="rename",
1171 from_name="bar.txt", to_name='wibble.txt')
1172 d.addCallback(lambda res:
1173 self.failIfNodeHasChild(self._foo_node, "bar.txt"))
1174 d.addCallback(lambda res:
1175 self.failUnlessNodeHasChild(self._foo_node, "wibble.txt"))
1176 d.addCallback(lambda res: self.GET(self.public_url + "/foo/wibble.txt"))
1177 d.addCallback(self.failUnlessIsBarDotTxt)
1178 d.addCallback(lambda res: self.GET(self.public_url + "/foo/wibble.txt?t=json"))
1179 d.addCallback(self.failUnlessIsBarJSON)
1182 def test_POST_rename_file_replace(self):
1183 # rename a file and replace a directory with it
1184 d = self.POST(self.public_url + "/foo", t="rename",
1185 from_name="bar.txt", to_name='empty')
1186 d.addCallback(lambda res:
1187 self.failIfNodeHasChild(self._foo_node, "bar.txt"))
1188 d.addCallback(lambda res:
1189 self.failUnlessNodeHasChild(self._foo_node, "empty"))
1190 d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty"))
1191 d.addCallback(self.failUnlessIsBarDotTxt)
1192 d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
1193 d.addCallback(self.failUnlessIsBarJSON)
1196 def test_POST_rename_file_no_replace_queryarg(self):
1197 # rename a file and replace a directory with it
1198 d = self.POST(self.public_url + "/foo?replace=false", t="rename",
1199 from_name="bar.txt", to_name='empty')
1200 d.addBoth(self.shouldFail, error.Error,
1201 "POST_rename_file_no_replace_queryarg",
1203 "There was already a child by that name, and you asked me "
1204 "to not replace it")
1205 d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
1206 d.addCallback(self.failUnlessIsEmptyJSON)
1209 def test_POST_rename_file_no_replace_field(self):
1210 # rename a file and replace a directory with it
1211 d = self.POST(self.public_url + "/foo", t="rename", replace="false",
1212 from_name="bar.txt", to_name='empty')
1213 d.addBoth(self.shouldFail, error.Error,
1214 "POST_rename_file_no_replace_field",
1216 "There was already a child by that name, and you asked me "
1217 "to not replace it")
1218 d.addCallback(lambda res: self.GET(self.public_url + "/foo/empty?t=json"))
1219 d.addCallback(self.failUnlessIsEmptyJSON)
1222 def failUnlessIsEmptyJSON(self, res):
1223 data = simplejson.loads(res)
1224 self.failUnlessEqual(data[0], "dirnode", data)
1225 self.failUnlessEqual(len(data[1]["children"]), 0)
1227 def test_POST_rename_file_slash_fail(self):
1228 d = self.POST(self.public_url + "/foo", t="rename",
1229 from_name="bar.txt", to_name='kirk/spock.txt')
1230 d.addBoth(self.shouldFail, error.Error,
1231 "test_POST_rename_file_slash_fail",
1233 "to_name= may not contain a slash",
1235 d.addCallback(lambda res:
1236 self.failUnlessNodeHasChild(self._foo_node, "bar.txt"))
1237 d.addCallback(lambda res: self.POST(self.public_url, t="rename",
1238 from_name="foo/bar.txt", to_name='george.txt'))
1239 d.addBoth(self.shouldFail, error.Error,
1240 "test_POST_rename_file_slash_fail",
1242 "from_name= may not contain a slash",
1244 d.addCallback(lambda res:
1245 self.failUnlessNodeHasChild(self.public_root, "foo"))
1246 d.addCallback(lambda res:
1247 self.failIfNodeHasChild(self.public_root, "george.txt"))
1248 d.addCallback(lambda res:
1249 self.failUnlessNodeHasChild(self._foo_node, "bar.txt"))
1250 d.addCallback(lambda res: self.GET(self.public_url + "/foo?t=json"))
1251 d.addCallback(self.failUnlessIsFooJSON)
1254 def test_POST_rename_dir(self):
1255 d = self.POST(self.public_url, t="rename",
1256 from_name="foo", to_name='plunk')
1257 d.addCallback(lambda res:
1258 self.failIfNodeHasChild(self.public_root, "foo"))
1259 d.addCallback(lambda res:
1260 self.failUnlessNodeHasChild(self.public_root, "plunk"))
1261 d.addCallback(lambda res: self.GET(self.public_url + "/plunk?t=json"))
1262 d.addCallback(self.failUnlessIsFooJSON)
1265 def shouldRedirect(self, res, target):
1266 if not isinstance(res, failure.Failure):
1267 self.fail("we were expecting to get redirected to %s, not get an"
1268 " actual page: %s" % (target, res))
1269 res.trap(error.PageRedirect)
1270 # the PageRedirect does not seem to capture the uri= query arg
1271 # properly, so we can't check for it.
1272 realtarget = self.webish_url + target
1273 self.failUnlessEqual(res.value.location, realtarget)
1275 def test_GET_URI_form(self):
1276 base = "/uri?uri=%s" % self._bar_txt_uri
1277 # this is supposed to give us a redirect to /uri/$URI, plus arguments
1278 targetbase = "/uri/%s" % urllib.quote(self._bar_txt_uri)
1280 d.addBoth(self.shouldRedirect, targetbase)
1281 d.addCallback(lambda res: self.GET(base+"&filename=bar.txt"))
1282 d.addBoth(self.shouldRedirect, targetbase+"?filename=bar.txt")
1283 d.addCallback(lambda res: self.GET(base+"&t=json"))
1284 d.addBoth(self.shouldRedirect, targetbase+"?t=json")
1285 d.addCallback(self.log, "about to get file by uri")
1286 d.addCallback(lambda res: self.GET(base, followRedirect=True))
1287 d.addCallback(self.failUnlessIsBarDotTxt)
1288 d.addCallback(self.log, "got file by uri, about to get dir by uri")
1289 d.addCallback(lambda res: self.GET("/uri?uri=%s&t=json" % self._foo_uri,
1290 followRedirect=True))
1291 d.addCallback(self.failUnlessIsFooJSON)
1292 d.addCallback(self.log, "got dir by uri")
1296 def test_GET_rename_form(self):
1297 d = self.GET(self.public_url + "/foo?t=rename-form&name=bar.txt",
1298 followRedirect=True) # XXX [ ] todo: figure out why '.../foo' doesn't work
1300 self.failUnless(re.search(r'name="when_done" value=".*%s/foo/' % (urllib.quote(self.public_url),), res), (r'name="when_done" value=".*%s/foo/' % (urllib.quote(self.public_url),), res,))
1301 self.failUnless(re.search(r'name="from_name" value="bar\.txt"', res))
1302 d.addCallback(_check)
1305 def log(self, res, msg):
1306 #print "MSG: %s RES: %s" % (msg, res)
1310 def test_GET_URI_URL(self):
1311 base = "/uri/%s" % self._bar_txt_uri.replace("/","!")
1313 d.addCallback(self.failUnlessIsBarDotTxt)
1314 d.addCallback(lambda res: self.GET(base+"?filename=bar.txt"))
1315 d.addCallback(self.failUnlessIsBarDotTxt)
1316 d.addCallback(lambda res: self.GET(base+"?filename=bar.txt&save=true"))
1317 d.addCallback(self.failUnlessIsBarDotTxt)
1320 def test_GET_URI_URL_dir(self):
1321 base = "/uri/%s?t=json" % self._foo_uri.replace("/","!")
1323 d.addCallback(self.failUnlessIsFooJSON)
1326 def test_GET_URI_URL_missing(self):
1327 base = "/uri/%s" % self._bad_file_uri.replace("/","!")
1329 d.addBoth(self.shouldHTTPError, "test_GET_URI_URL_missing",
1330 http.GONE, response_substring="NotEnoughPeersError")
1331 # TODO: how can we exercise both sides of WebDownloadTarget.fail
1332 # here? we must arrange for a download to fail after target.open()
1333 # has been called, and then inspect the response to see that it is
1334 # shorter than we expected.
1337 def test_PUT_NEWFILEURL_uri(self):
1338 contents, n, new_uri = self.makefile(8)
1339 d = self.PUT(self.public_url + "/foo/new.txt?t=uri", new_uri)
1340 d.addCallback(lambda res: self.failUnlessEqual(res.strip(), new_uri))
1341 d.addCallback(lambda res:
1342 self.failUnlessChildContentsAre(self._foo_node, "new.txt",
1346 def test_PUT_NEWFILEURL_uri_replace(self):
1347 contents, n, new_uri = self.makefile(8)
1348 d = self.PUT(self.public_url + "/foo/bar.txt?t=uri", new_uri)
1349 d.addCallback(lambda res: self.failUnlessEqual(res.strip(), new_uri))
1350 d.addCallback(lambda res:
1351 self.failUnlessChildContentsAre(self._foo_node, "bar.txt",
1355 def test_PUT_NEWFILEURL_uri_no_replace(self):
1356 contents, n, new_uri = self.makefile(8)
1357 d = self.PUT(self.public_url + "/foo/bar.txt?t=uri&replace=false", new_uri)
1358 d.addBoth(self.shouldFail, error.Error, "PUT_NEWFILEURL_uri_no_replace",
1360 "There was already a child by that name, and you asked me "
1361 "to not replace it")
1364 def test_PUT_NEWFILE_URI(self):
1365 file_contents = "New file contents here\n"
1366 d = self.PUT("/uri", file_contents)
1368 self.failUnless(uri in FakeCHKFileNode.all_contents)
1369 self.failUnlessEqual(FakeCHKFileNode.all_contents[uri],
1371 return self.GET("/uri/%s" % uri.replace("/","!"))
1372 d.addCallback(_check)
1374 self.failUnlessEqual(res, file_contents)
1375 d.addCallback(_check2)
1378 def test_PUT_NEWFILE_URI_only_PUT(self):
1379 d = self.PUT("/uri?t=bogus", "")
1380 d.addBoth(self.shouldFail, error.Error,
1381 "PUT_NEWFILE_URI_only_PUT",
1383 "/uri only accepts PUT and PUT?t=mkdir")
1386 def test_PUT_NEWDIR_URI(self):
1387 d = self.PUT("/uri?t=mkdir", "")
1389 n = self.s.create_node_from_uri(uri.strip())
1390 d2 = self.failUnlessNodeKeysAre(n, [])
1391 d2.addCallback(lambda res:
1392 self.GET("/uri/%s?t=json" % uri.replace("/","!")))
1394 d.addCallback(_check)
1395 d.addCallback(self.failUnlessIsEmptyJSON)
1398 def test_POST_check(self):
1399 d = self.POST(self.public_url + "/foo", t="check", name="bar.txt")
1401 # this returns a string form of the results, which are probably
1402 # None since we're using fake filenodes.
1403 # TODO: verify that the check actually happened, by changing
1404 # FakeCHKFileNode to count how many times .check() has been
1407 d.addCallback(_done)