from cStringIO import StringIO
from twisted.internet import defer, reactor, protocol, error
from twisted.application import service, internet
-from twisted.web.client import getPage, downloadPage
+from twisted.web import client as tw_client
from allmydata import client, introducer_and_vdrive
from allmydata.scripts import create_node
from allmydata.util import testutil, fileutil
from foolscap import eventual
from twisted.python import log
+class StallableHTTPGetterDiscarder(tw_client.HTTPPageGetter):
+ full_speed_ahead = False
+ _bytes_so_far = 0
+ stalled = None
+ def handleResponsePart(self, data):
+ self._bytes_so_far += len(data)
+ if not self.factory.do_stall:
+ return
+ if self.full_speed_ahead:
+ return
+ if self._bytes_so_far > 1e6+100:
+ if not self.stalled:
+ print "STALLING"
+ self.transport.pauseProducing()
+ self.stalled = reactor.callLater(10.0, self._resume_speed)
+ def _resume_speed(self):
+ print "RESUME SPEED"
+ self.stalled = None
+ self.full_speed_ahead = True
+ self.transport.resumeProducing()
+ def handleResponseEnd(self):
+ if self.stalled:
+ print "CANCEL"
+ self.stalled.cancel()
+ self.stalled = None
+ return tw_client.HTTPPageGetter.handleResponseEnd(self)
+
+class StallableDiscardingHTTPClientFactory(tw_client.HTTPClientFactory):
+ protocol = StallableHTTPGetterDiscarder
+
+def discardPage(url, stall=False, *args, **kwargs):
+ """Start fetching the URL, but stall our pipe after the first 1MB.
+ Wait 10 seconds, then resume downloading (and discarding) everything.
+ """
+ # adapted from twisted.web.client.getPage . We can't just wrap or
+ # subclass because it provides no way to override the HTTPClientFactory
+ # that it creates.
+ scheme, host, port, path = tw_client._parse(url)
+ factory = StallableDiscardingHTTPClientFactory(url, *args, **kwargs)
+ factory.do_stall = stall
+ assert scheme == 'http'
+ reactor.connectTCP(host, port, factory)
+ return factory.deferred
+
class SystemFramework(testutil.PollMixin):
numnodes = 5
self.tub.setServiceParent(self.sparent)
self.discard_shares = True
self.mode = mode
- if mode in ("download", "download-GET"):
+ if mode in ("download", "download-GET", "download-GET-slow"):
self.discard_shares = False
self.failed = False
body = "\r\n".join(form) + "\r\n"
headers = {"content-type": "multipart/form-data; boundary=%s" % sepbase,
}
- return getPage(url, method="POST", postdata=body,
- headers=headers, followRedirect=False)
+ return tw_client.getPage(url, method="POST", postdata=body,
+ headers=headers, followRedirect=False)
- def GET_discard(self, urlpath):
- # TODO: Slow
+ def GET_discard(self, urlpath, stall):
url = self.webish_url + urlpath + "?filename=dummy-get.out"
- return downloadPage(url, os.path.join(self.basedir, "dummy-get.out"))
+ return discardPage(url, stall)
def _print_usage(self, res=None):
d = self.control_rref.callRemote("get_memory_usage")
data = "a" * size
url = "/vdrive/global"
d = self.POST(url, t="upload", file=("%d.data" % size, data))
- elif self.mode in ("download", "download-GET"):
+ elif self.mode in ("download", "download-GET", "download-GET-slow"):
# upload the data from a local peer, then have the
# client-under-test download it.
files[name] = self.create_data(name, size)
return d
def _do_download(self, res, size, uris):
- if self.mode not in ("download", "download-GET"):
+ if self.mode not in ("download", "download-GET", "download-GET-slow"):
return
name = '%d' % size
+ print "downloading %s" % name
uri = uris[name]
+
if self.mode == "download":
d = self.control_rref.callRemote("download_from_uri_to_file",
uri, "dummy.out")
- if self.mode == "download-GET":
+ elif self.mode == "download-GET":
url = "/uri/%s" % uri
- d = self.GET_discard(urllib.quote(url))
+ d = self.GET_discard(urllib.quote(url), stall=False)
+ elif self.mode == "download-GET-slow":
+ url = "/uri/%s" % uri
+ d = self.GET_discard(urllib.quote(url), stall=True)
+ def _complete(res):
+ print "downloaded %s" % name
+ return res
+ d.addCallback(_complete)
return d
def do_test(self):