3 import sha as shamodule
6 from pkg_resources import require
8 from pyrrd import graph
9 from pyrrd.rrd import DataSource, RRD, RRA
13 return shamodule.new(s).digest()
19 def __init__(self, nid, introducer, simulator):
21 self.introducer = introducer
22 self.simulator = simulator
24 self.capacity = random.randrange(1000)
28 def permute_peers(self, fileid):
29 permuted = [(sha(fileid+n.nid),n)
30 for n in self.introducer.get_all_nodes()]
34 def publish_file(self, fileid, size, numshares=100):
35 sharesize = 4 * size / numshares
36 permuted = self.permute_peers(fileid)
40 while numshares and permuted:
41 pid,node = permuted.pop(0)
44 if node.accept_share(fileid, sharesize):
45 givento.append((pid,node))
48 # couldn't push, should delete
49 for pid,node in givento:
50 node.delete_share(fileid)
52 self.files.append((fileid, numshares))
53 self.introducer.please_preserve(fileid, size, tried, last_givento)
56 def accept_share(self, fileid, sharesize):
57 if self.utilization < self.capacity:
59 self.shares[fileid] = sharesize
60 self.utilization += sharesize
62 if self.decide(sharesize):
63 # we don't, but we'll make room
64 self.make_space(sharesize)
65 self.shares[fileid] = sharesize
66 self.utilization += sharesize
69 # we're full, try elsewhere
72 def decide(self, sharesize):
73 if sharesize > self.capacity:
76 return random.random() > 0.5
78 def make_space(self, sharesize):
79 assert sharesize <= self.capacity
80 while self.capacity - self.utilization < sharesize:
81 victim = random.choice(self.shares.keys())
82 self.simulator.lost_data(self.shares[victim])
83 self.delete_share(victim)
85 def delete_share(self, fileid):
86 if fileid in self.shares:
87 self.utilization -= self.shares[fileid]
88 del self.shares[fileid]
92 def retrieve_file(self):
95 fileid,numshares = random.choice(self.files)
96 needed = numshares / 4
98 for pid,node in self.permute_peers(fileid):
99 if random.random() > self.simulator.P_NODEAVAIL:
100 continue # node isn't available right now
101 if node.has_share(fileid):
103 if len(peers) >= needed:
107 def delete_file(self):
110 which = random.choice(self.files)
111 self.files.remove(which)
112 fileid,numshares = which
113 self.introducer.delete(fileid)
117 def __init__(self, simulator):
118 self.living_files = {}
119 self.utilization = 0 # total size of all active files
120 self.simulator = simulator
121 self.simulator.stamp_utilization(self.utilization)
123 def get_all_nodes(self):
124 return self.all_nodes
126 def please_preserve(self, fileid, size, tried, last_givento):
127 self.living_files[fileid] = (size, tried, last_givento)
128 self.utilization += size
129 self.simulator.stamp_utilization(self.utilization)
131 def please_delete(self, fileid):
134 def permute_peers(self, fileid):
135 permuted = [(sha(fileid+n.nid),n)
136 for n in self.get_all_nodes()]
140 def delete(self, fileid):
141 permuted = self.permute_peers(fileid)
142 size, tried, last_givento = self.living_files[fileid]
144 while tried and pid < last_givento:
145 pid,node = permuted.pop(0)
146 had_it = node.delete_share(fileid)
149 self.utilization -= size
150 self.simulator.stamp_utilization(self.utilization)
151 del self.living_files[fileid]
155 EVENTS = ["ADDFILE", "DELFILE", "ADDNODE", "DELNODE"]
156 RATE_ADDFILE = 1.0 / 10
157 RATE_DELFILE = 1.0 / 20
158 RATE_ADDNODE = 1.0 / 3000
159 RATE_DELNODE = 1.0 / 4000
163 self.time = 1164783600 # small numbers of seconds since the epoch confuse rrdtool
164 self.prevstamptime = int(self.time)
166 ds = DataSource(ds_name='utilizationds', ds_type='GAUGE', heartbeat=1)
167 rra = RRA(cf='AVERAGE', xff=0.1, steps=1, rows=1200)
168 self.rrd = RRD("/tmp/utilization.rrd", ds=[ds], rra=[rra], start=self.time)
171 self.introducer = q = Introducer(self)
172 self.all_nodes = [Node(randomid(), q, self)
173 for i in range(self.NUM_NODES)]
174 q.all_nodes = self.all_nodes
176 self.schedule_events()
181 self.deleted_files = 0
182 self.published_files = []
183 self.failed_files = 0
184 self.lost_data_bytes = 0 # bytes deleted to make room for new shares
186 def stamp_utilization(self, utilization):
187 if int(self.time) > (self.prevstamptime+1):
188 self.rrd.bufferValue(self.time, utilization)
189 self.prevstamptime = int(self.time)
191 def write_graph(self):
197 def1 = graph.DataDefinition(vname="a", rrdfile='/tmp/utilization.rrd', ds_name='utilizationds')
198 area1 = graph.Area(value="a", color="#990033", legend='utilizationlegend')
199 g = graph.Graph('/tmp/utilization.png', imgformat='PNG', width=540, height=100, vertical_label='utilizationverticallabel', title='utilizationtitle', lower_limit=0)
205 size = random.randrange(1000)
206 n = random.choice(self.all_nodes)
208 print "add_file(size=%d, from node %s)" % (size, n)
210 able = n.publish_file(fileid, size)
213 self.added_files += 1
214 self.added_data += size
215 self.published_files.append(tried)
217 self.failed_files += 1
219 def lost_data(self, size):
220 self.lost_data_bytes += size
222 def delete_file(self):
223 all_nodes = self.all_nodes[:]
224 random.shuffle(all_nodes)
227 self.deleted_files += 1
229 print "no files to delete"
231 def _add_event(self, etype):
232 rate = getattr(self, "RATE_" + etype)
233 next = self.time + random.expovariate(rate)
234 self.next.append((next, etype))
237 def schedule_events(self):
238 types = set([e[1] for e in self.next])
239 for etype in self.EVENTS:
240 if not etype in types:
241 self._add_event(etype)
244 time, etype = self.next.pop(0)
245 assert time > self.time
246 # current_time = self.time
248 self._add_event(etype)
249 if etype == "ADDFILE":
251 elif etype == "DELFILE":
253 elif etype == "ADDNODE":
256 elif etype == "DELNODE":
259 # self.print_stats(current_time, etype)
261 def print_stats_header(self):
262 print "time: added failed lost avg_tried"
264 def print_stats(self, time, etype):
265 if not self.published_files:
268 avg_tried = sum(self.published_files) / len(self.published_files)
269 print time, etype, self.added_data, self.failed_files, self.lost_data_bytes, avg_tried, len(self.introducer.living_files), self.introducer.utilization
275 # rrdtool.create("foo.rrd",
277 # "DS:files-added:DERIVE::0:1000",
278 # "RRA:AVERAGE:1:1:1200",
282 # s.print_stats_header()
283 for i in range(1000):
285 print "%d files added, %d files deleted" % (s.added_files, s.deleted_files)
288 if __name__ == '__main__':