self.all_contents[self.storage_index] = modifier(old_contents)
return None
+ def download(self, target):
+ if self.storage_index not in self.all_contents:
+ f = failure.Failure(NotEnoughSharesError())
+ target.fail(f)
+ return defer.fail(f)
+ data = self.all_contents[self.storage_index]
+ target.open(len(data))
+ target.write(data)
+ target.close()
+ return defer.maybeDeferred(target.finish)
+ def download_to_data(self):
+ if self.storage_index not in self.all_contents:
+ return defer.fail(NotEnoughSharesError())
+ data = self.all_contents[self.storage_index]
+ return defer.succeed(data)
def make_mutable_file_uri():
return uri.WriteableSSKFileURI(writekey=os.urandom(16),
d.addCallback(self.failUnlessIsBarDotTxt)
return d
+ def test_GET_unhandled_URI_named(self):
+ contents, n, newuri = self.makefile(12)
+ verifier_cap = n.get_verifier().to_string()
+ base = "/file/%s" % urllib.quote(verifier_cap)
+ # client.create_node_from_uri() can't handle verify-caps
+ d = self.shouldFail2(error.Error, "GET_unhandled_URI_named",
+ "400 Bad Request",
+ "is not a valid file- or directory- cap",
+ self.GET, base)
+ return d
+
def test_GET_FILEURL_save(self):
d = self.GET(self.public_url + "/foo/bar.txt?filename=bar.txt&save=true")
# TODO: look at the headers, expect a Content-Disposition: attachment
self.NEWFILE_CONTENTS))
return d
+ def test_PUT_NEWFILEURL_bad_t(self):
+ d = self.shouldFail2(error.Error, "PUT_bad_t", "400 Bad Request",
+ "PUT to a file: bad t=bogus",
+ self.PUT, self.public_url + "/foo/bar.txt?t=bogus",
+ "contents")
+ return d
+
def test_PUT_NEWFILEURL_no_replace(self):
d = self.PUT(self.public_url + "/foo/bar.txt?replace=false",
self.NEWFILE_CONTENTS)
file=("new.txt", self.NEWFILE_CONTENTS))
def _check(new_uri):
new_uri = new_uri.strip()
+ self.new_uri = new_uri
u = IURI(new_uri)
self.failUnless(IMutableFileURI.providedBy(u))
self.failUnless(u.storage_index in FakeMutableFileNode.all_contents)
d.addCallback(_check)
def _check2(data):
self.failUnlessEqual(data, self.NEWFILE_CONTENTS)
+ return self.GET("/uri/%s" % urllib.quote(self.new_uri))
d.addCallback(_check2)
+ def _check3(data):
+ self.failUnlessEqual(data, self.NEWFILE_CONTENTS)
+ return self.GET("/file/%s" % urllib.quote(self.new_uri))
+ d.addCallback(_check3)
+ def _check4(data):
+ self.failUnlessEqual(data, self.NEWFILE_CONTENTS)
+ d.addCallback(_check4)
return d
def test_POST_upload_mutable(self):
self.failUnlessEqual(self._mutable_uri, newnode.get_uri())
d.addCallback(_got2)
+ # upload a second time, using PUT instead of POST
+ NEW2_CONTENTS = NEWER_CONTENTS + "overwrite with PUT\n"
+ d.addCallback(lambda res:
+ self.PUT(self.public_url + "/foo/new.txt", NEW2_CONTENTS))
+ d.addCallback(self.failUnlessURIMatchesChild, fn, u"new.txt")
+ d.addCallback(lambda res:
+ self.failUnlessMutableChildContentsAre(fn, u"new.txt",
+ NEW2_CONTENTS))
+
# finally list the directory, since mutable files are displayed
# differently