import os, sha, stat, time, re
+from base64 import b32decode
from foolscap import Referenceable, SturdyRef
from zope.interface import implements
from allmydata.interfaces import RIClient
MY_FURL_FILE = "myself.furl"
SUICIDE_PREVENTION_HOTLINE_FILE = "suicide_prevention_hotline"
SIZELIMIT_FILE = "sizelimit"
+ PUSH_TO_OURSELVES_FILE = "push_to_ourselves"
# we're pretty narrow-minded right now
OLDEST_SUPPORTED_VERSION = allmydata.__version__
self.my_furl = None
self.introducer_client = None
self.init_storage()
+ self.init_options()
self.add_service(Uploader())
self.add_service(Downloader())
self.add_service(VirtualDrive())
no_storage = os.path.exists(NOSTORAGE_FILE)
self.add_service(StorageServer(storedir, sizelimit, no_storage))
+ def init_options(self):
+ self.push_to_ourselves = None
+ filename = os.path.join(self.basedir, self.PUSH_TO_OURSELVES_FILE)
+ if os.path.exists(filename):
+ self.push_to_ourselves = True
+
def _check_hotline(self, hotline_file):
if os.path.exists(hotline_file):
mtime = os.stat(hotline_file)[stat.ST_MTIME]
return []
return self.introducer_client.get_all_peerids()
- def get_permuted_peers(self, key):
+ def get_permuted_peers(self, key, include_myself=True):
"""
@return: list of (permuted-peerid, peerid, connection,)
"""
results = []
+ myid = b32decode(self.tub.tubID.upper())
for peerid, connection in self.introducer_client.get_all_peers():
assert isinstance(peerid, str)
+ if not include_myself and peerid == myid:
+ self.log("get_permuted_peers: removing myself from the list")
+ continue
permuted = bytes_to_long(sha.new(key + peerid).digest())
results.append((permuted, peerid, connection))
results.sort()
return results
+ def get_push_to_ourselves(self):
+ return self.push_to_ourselves
+
def get_encoding_parameters(self):
if not self.introducer_client:
return None
self.log(" introducing ourselves: %s, %s" % (self, self.my_furl))
self._connected = True
d = introducer.callRemote("hello",
- node=self,
- furl=self.my_furl)
+ node=self,
+ furl=self.my_furl)
introducer.notifyOnDisconnect(self._disconnected)
def _disconnected(self):
self.tub = Tub(certFile=certfile)
self.tub.setOption("logLocalFailures", True)
self.tub.setOption("logRemoteFailures", True)
+ # I think self.nodeid is kind of whacked. Shouldn't it equal the
+ # fingerprint portion of our furl?
self.nodeid = b32encode(self.tub.tubID).lower()
f = open(os.path.join(self.basedir, self.NODEIDFILE), "w")
f.write(b32encode(self.nodeid).lower() + "\n")
f = open(os.path.join(clientdir, "debug_no_storage"), "w")
f.write("no_storage\n")
f.close()
+ if self.mode == "upload-self":
+ f = open(os.path.join(clientdir, "push_to_ourselves"), "w")
+ f.write("push_to_ourselves\n")
+ f.close()
self.keepalive_file = os.path.join(clientdir,
"suicide_prevention_hotline")
# now start updating the mtime.
name = '%d' % size
print
print "uploading %s" % name
- if self.mode == "upload":
+ if self.mode in ("upload", "upload-self"):
files[name] = self.create_data(name, size)
d = control.callRemote("upload_from_file_to_uri", files[name])
def _done(uri):
class FakeClient:
def __init__(self, mode="good"):
self.mode = mode
- def get_permuted_peers(self, storage_index):
+ def get_permuted_peers(self, storage_index, include_myself):
return [ ("%20d"%fakeid, "%20d"%fakeid, FakePeer(self.mode),)
for fakeid in range(50) ]
+ def get_push_to_ourselves(self):
+ return None
def get_encoding_parameters(self):
return None
def get_shareholders(self, client,
storage_index, share_size, block_size,
- num_segments, total_shares, shares_of_happiness):
+ num_segments, total_shares, shares_of_happiness,
+ push_to_ourselves):
"""
@return: a set of PeerTracker instances that have agreed to hold some
shares for us
# we are responsible for locating the shareholders. self._encoder is
# responsible for handling the data and sending out the shares.
- peers = client.get_permuted_peers(storage_index)
- assert peers
+ peers = client.get_permuted_peers(storage_index, push_to_ourselves)
+
+ assert peers, "peer selection left us with zero peers for our data"
# this needed_hashes computation should mirror
# Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
block_size = encoder.get_param("block_size")
num_segments = encoder.get_param("num_segments")
k,desired,n = encoder.get_param("share_counts")
+ push_to_ourselves = self._options.get("push_to_ourselves", False)
gs = peer_selector.get_shareholders
d = gs(self._client, storage_index, share_size, block_size,
- num_segments, n, desired)
+ num_segments, n, desired, push_to_ourselves)
return d
def set_shareholders(self, used_peers, encoder):
# this returns the URI
assert self.parent
assert self.running
+ push_to_ourselves = self.parent.get_push_to_ourselves()
+ if push_to_ourselves is not None:
+ options["push_to_ourselves"] = push_to_ourselves
+
uploadable = IUploadable(uploadable)
d = uploadable.get_size()
def _got_size(size):