]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
82780a02f658d462157b2ce550340ec5f9b9b239
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 import shutil
5 from collections import deque
6 import time
7
8 from twisted.internet import defer, reactor, task
9 from twisted.python.failure import Failure
10 from twisted.python import runtime
11 from twisted.application import service
12
13 from allmydata.util import fileutil
14 from allmydata.interfaces import IDirectoryNode
15 from allmydata.util import log
16 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
17 from allmydata.util.assertutil import precondition
18 from allmydata.util.deferredutil import HookMixin
19 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
20      extend_filepath, unicode_from_filepath, unicode_segments_from, \
21      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
22 from allmydata.immutable.upload import FileName, Data
23 from allmydata import magicfolderdb, magicpath
24
25
26 IN_EXCL_UNLINK = 0x04000000L
27
28 def get_inotify_module():
29     try:
30         if sys.platform == "win32":
31             from allmydata.windows import inotify
32         elif runtime.platform.supportsINotify():
33             from twisted.internet import inotify
34         else:
35             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
36                                       "This currently requires Linux or Windows.")
37         return inotify
38     except (ImportError, AttributeError) as e:
39         log.msg(e)
40         if sys.platform == "win32":
41             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
42                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
43         raise
44
45
46 class MagicFolder(service.MultiService):
47     name = 'magic-folder'
48
49     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
50                  pending_delay=1.0, clock=reactor):
51         precondition_abspath(local_path_u)
52
53         service.MultiService.__init__(self)
54
55         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
56         if db is None:
57             return Failure(Exception('ERROR: Unable to load magic folder db.'))
58
59         # for tests
60         self._client = client
61         self._db = db
62
63         self.is_ready = False
64
65         self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
66         self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
67
68     def startService(self):
69         # TODO: why is this being called more than once?
70         if self.running:
71             return defer.succeed(None)
72         print "%r.startService" % (self,)
73         service.MultiService.startService(self)
74         return self.uploader.start_monitoring()
75
76     def ready(self):
77         """ready is used to signal us to start
78         processing the upload and download items...
79         """
80         self.is_ready = True
81         d = self.uploader.start_scanning()
82         d2 = self.downloader.start_scanning()
83         d.addCallback(lambda ign: d2)
84         return d
85
86     def finish(self):
87         print "finish"
88         d = self.uploader.stop()
89         d2 = self.downloader.stop()
90         d.addCallback(lambda ign: d2)
91         return d
92
93     def remove_service(self):
94         return service.MultiService.disownServiceParent(self)
95
96
97 class QueueMixin(HookMixin):
98     def __init__(self, client, local_path_u, db, name, clock):
99         self._client = client
100         self._local_path_u = local_path_u
101         self._local_filepath = to_filepath(local_path_u)
102         self._db = db
103         self._name = name
104         self._clock = clock
105         self._hooks = {'processed': None, 'started': None}
106         self.started_d = self.set_hook('started')
107
108         if not self._local_filepath.exists():
109             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
110                                  "but there is no directory at that location."
111                                  % quote_local_unicode_path(self._local_path_u))
112         if not self._local_filepath.isdir():
113             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
114                                  "but the thing at that location is not a directory."
115                                  % quote_local_unicode_path(self._local_path_u))
116
117         self._deque = deque()
118         self._lazy_tail = defer.succeed(None)
119         self._pending = set()
120         self._stopped = False
121         self._turn_delay = 0
122
123     def _get_filepath(self, relpath_u):
124         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
125
126     def _get_relpath(self, filepath):
127         self._log("_get_relpath(%r)" % (filepath,))
128         segments = unicode_segments_from(filepath, self._local_filepath)
129         self._log("segments = %r" % (segments,))
130         return u"/".join(segments)
131
132     def _count(self, counter_name, delta=1):
133         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
134         self._log("%s += %r" % (counter_name, delta))
135         self._client.stats_provider.count(ctr, delta)
136
137     def _logcb(self, res, msg):
138         self._log("%s: %r" % (msg, res))
139         return res
140
141     def _log(self, msg):
142         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
143         self._client.log(s)
144         print s
145         #open("events", "ab+").write(msg)
146
147     def _append_to_deque(self, relpath_u):
148         self._log("_append_to_deque(%r)" % (relpath_u,))
149         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
150             return
151         self._deque.append(relpath_u)
152         self._pending.add(relpath_u)
153         self._count('objects_queued')
154         if self.is_ready:
155             self._clock.callLater(0, self._turn_deque)
156
157     def _turn_deque(self):
158         self._log("_turn_deque")
159         if self._stopped:
160             self._log("stopped")
161             return
162         try:
163             item = self._deque.pop()
164             self._log("popped %r" % (item,))
165             self._count('objects_queued', -1)
166         except IndexError:
167             self._log("deque is now empty")
168             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
169         else:
170             self._lazy_tail.addCallback(lambda ign: self._process(item))
171             self._lazy_tail.addBoth(self._call_hook, 'processed')
172             self._lazy_tail.addErrback(log.err)
173             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
174
175
176 class Uploader(QueueMixin):
177     def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
178         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
179
180         self.is_ready = False
181
182         # TODO: allow a path rather than a cap URI.
183         self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
184         if not IDirectoryNode.providedBy(self._upload_dirnode):
185             raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
186         if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
187             raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
188
189         self._inotify = get_inotify_module()
190         self._notifier = self._inotify.INotify()
191
192         if hasattr(self._notifier, 'set_pending_delay'):
193             self._notifier.set_pending_delay(pending_delay)
194
195         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
196         #
197         self.mask = ( self._inotify.IN_CREATE
198                     | self._inotify.IN_CLOSE_WRITE
199                     | self._inotify.IN_MOVED_TO
200                     | self._inotify.IN_MOVED_FROM
201                     | self._inotify.IN_DELETE
202                     | self._inotify.IN_ONLYDIR
203                     | IN_EXCL_UNLINK
204                     )
205         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
206                              recursive=True)
207
208     def start_monitoring(self):
209         self._log("start_monitoring")
210         d = defer.succeed(None)
211         d.addCallback(lambda ign: self._notifier.startReading())
212         d.addCallback(lambda ign: self._count('dirs_monitored'))
213         d.addBoth(self._call_hook, 'started')
214         return d
215
216     def stop(self):
217         self._log("stop")
218         self._notifier.stopReading()
219         self._count('dirs_monitored', -1)
220         if hasattr(self._notifier, 'wait_until_stopped'):
221             d = self._notifier.wait_until_stopped()
222         else:
223             d = defer.succeed(None)
224         d.addCallback(lambda ign: self._lazy_tail)
225         return d
226
227     def start_scanning(self):
228         self._log("start_scanning")
229         self.is_ready = True
230         self._pending = self._db.get_all_relpaths()
231         self._log("all_files %r" % (self._pending))
232         d = self._scan(u"")
233         def _add_pending(ign):
234             # This adds all of the files that were in the db but not already processed
235             # (normally because they have been deleted on disk).
236             self._log("adding %r" % (self._pending))
237             self._deque.extend(self._pending)
238         d.addCallback(_add_pending)
239         d.addCallback(lambda ign: self._turn_deque())
240         return d
241
242     def _scan(self, reldir_u):
243         self._log("scan %r" % (reldir_u,))
244         fp = self._get_filepath(reldir_u)
245         try:
246             children = listdir_filepath(fp)
247         except EnvironmentError:
248             raise Exception("WARNING: magic folder: permission denied on directory %s"
249                             % quote_filepath(fp))
250         except FilenameEncodingError:
251             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
252                             % quote_filepath(fp))
253
254         d = defer.succeed(None)
255         for child in children:
256             assert isinstance(child, unicode), child
257             d.addCallback(lambda ign, child=child:
258                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
259             def _add_pending(relpath_u):
260                 if magicpath.should_ignore_file(relpath_u):
261                     return None
262
263                 self._pending.add(relpath_u)
264                 return relpath_u
265             d.addCallback(_add_pending)
266             # This call to _process doesn't go through the deque, and probably should.
267             d.addCallback(self._process)
268             d.addBoth(self._call_hook, 'processed')
269             d.addErrback(log.err)
270
271         return d
272
273     def _notify(self, opaque, path, events_mask):
274         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
275
276         # We filter out IN_CREATE events not associated with a directory.
277         # Acting on IN_CREATE for files could cause us to read and upload
278         # a possibly-incomplete file before the application has closed it.
279         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
280         # It isn't possible to avoid watching for IN_CREATE at all, because
281         # it is the only event notified for a directory creation.
282
283         if ((events_mask & self._inotify.IN_CREATE) != 0 and
284             (events_mask & self._inotify.IN_ISDIR) == 0):
285             self._log("ignoring inotify event for creation of file %r\n" % (path,))
286             return
287
288         relpath_u = self._get_relpath(path)
289         self._append_to_deque(relpath_u)
290
291     def _when_queue_is_empty(self):
292         return defer.succeed(None)
293
294     def _process(self, relpath_u):
295         self._log("_process(%r)" % (relpath_u,))
296         if relpath_u is None:
297             return
298         precondition(isinstance(relpath_u, unicode), relpath_u)
299
300         d = defer.succeed(None)
301
302         def _maybe_upload(val, now=None):
303             if now is None:
304                 now = time.time()
305             fp = self._get_filepath(relpath_u)
306             pathinfo = get_pathinfo(unicode_from_filepath(fp))
307
308             self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
309             self._pending.remove(relpath_u)
310             encoded_path_u = magicpath.path2magic(relpath_u)
311
312             if not pathinfo.exists:
313                 # FIXME merge this with the 'isfile' case.
314                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
315                 self._count('objects_disappeared')
316                 if not self._db.check_file_db_exists(relpath_u):
317                     return None
318
319                 last_downloaded_timestamp = now
320                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
321
322                 current_version = self._db.get_local_file_version(relpath_u)
323                 if current_version is None:
324                     new_version = 0
325                 elif self._db.is_new_file(pathinfo, relpath_u):
326                     new_version = current_version + 1
327                 else:
328                     self._log("Not uploading %r" % (relpath_u,))
329                     self._count('objects_not_uploaded')
330                     return
331
332                 metadata = { 'version': new_version,
333                              'deleted': True,
334                              'last_downloaded_timestamp': last_downloaded_timestamp }
335                 if last_downloaded_uri is not None:
336                     metadata['last_downloaded_uri'] = last_downloaded_uri
337
338                 empty_uploadable = Data("", self._client.convergence)
339                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
340                                                    metadata=metadata, overwrite=True)
341
342                 def _add_db_entry(filenode):
343                     filecap = filenode.get_uri()
344                     self._db.did_upload_version(relpath_u, new_version, filecap,
345                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
346                     self._count('files_uploaded')
347                 d2.addCallback(_add_db_entry)
348                 return d2
349             elif pathinfo.islink:
350                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
351                 return None
352             elif pathinfo.isdir:
353                 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
354                 uploadable = Data("", self._client.convergence)
355                 encoded_path_u += magicpath.path2magic(u"/")
356                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
357                 def _succeeded(ign):
358                     self._log("created subdirectory %r" % (relpath_u,))
359                     self._count('directories_created')
360                 def _failed(f):
361                     self._log("failed to create subdirectory %r" % (relpath_u,))
362                     return f
363                 upload_d.addCallbacks(_succeeded, _failed)
364                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
365                 return upload_d
366             elif pathinfo.isfile:
367                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
368                 last_downloaded_timestamp = now
369
370                 current_version = self._db.get_local_file_version(relpath_u)
371                 if current_version is None:
372                     new_version = 0
373                 elif self._db.is_new_file(pathinfo, relpath_u):
374                     new_version = current_version + 1
375                 else:
376                     self._log("Not uploading %r" % (relpath_u,))
377                     self._count('objects_not_uploaded')
378                     return None
379
380                 metadata = { 'version': new_version,
381                              'last_downloaded_timestamp': last_downloaded_timestamp }
382                 if last_downloaded_uri is not None:
383                     metadata['last_downloaded_uri'] = last_downloaded_uri
384
385                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
386                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
387                                                    metadata=metadata, overwrite=True)
388
389                 def _add_db_entry(filenode):
390                     filecap = filenode.get_uri()
391                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
392                     self._db.did_upload_version(relpath_u, new_version, filecap,
393                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
394                     self._count('files_uploaded')
395                 d2.addCallback(_add_db_entry)
396                 return d2
397             else:
398                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
399                 return None
400
401         d.addCallback(_maybe_upload)
402
403         def _succeeded(res):
404             self._count('objects_succeeded')
405             return res
406         def _failed(f):
407             self._count('objects_failed')
408             self._log("%r while processing %r" % (f, relpath_u))
409             return f
410         d.addCallbacks(_succeeded, _failed)
411         return d
412
413     def _get_metadata(self, encoded_path_u):
414         try:
415             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
416         except KeyError:
417             return Failure()
418         return d
419
420     def _get_filenode(self, encoded_path_u):
421         try:
422             d = self._upload_dirnode.get(encoded_path_u)
423         except KeyError:
424             return Failure()
425         return d
426
427
428 class WriteFileMixin(object):
429     FUDGE_SECONDS = 10.0
430
431     def _get_conflicted_filename(self, abspath_u):
432         return abspath_u + u".conflict"
433
434     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
435         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
436                   % (abspath_u, len(file_contents), is_conflict, now))
437
438         # 1. Write a temporary file, say .foo.tmp.
439         # 2. is_conflict determines whether this is an overwrite or a conflict.
440         # 3. Set the mtime of the replacement file to be T seconds before the
441         #    current local time.
442         # 4. Perform a file replacement with backup filename foo.backup,
443         #    replaced file foo, and replacement file .foo.tmp. If any step of
444         #    this operation fails, reclassify as a conflict and stop.
445         #
446         # Returns the path of the destination file.
447
448         precondition_abspath(abspath_u)
449         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
450         backup_path_u = abspath_u + u".backup"
451         if now is None:
452             now = time.time()
453
454         # ensure parent directory exists
455         head, tail = os.path.split(abspath_u)
456         mode = 0777 # XXX
457         fileutil.make_dirs(head, mode)
458
459         fileutil.write(replacement_path_u, file_contents)
460         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
461         if is_conflict:
462             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
463             return self._rename_conflicted_file(abspath_u, replacement_path_u)
464         else:
465             try:
466                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
467                 return abspath_u
468             except fileutil.ConflictError:
469                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
470
471     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
472         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
473
474         conflict_path_u = self._get_conflicted_filename(abspath_u)
475         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
476         if os.path.isfile(replacement_path_u):
477             print "%r exists" % (replacement_path_u,)
478         if os.path.isfile(conflict_path_u):
479             print "%r exists" % (conflict_path_u,)
480
481         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
482         return conflict_path_u
483
484
485 class Downloader(QueueMixin, WriteFileMixin):
486     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
487
488     def __init__(self, client, local_path_u, db, collective_dircap, clock):
489         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
490
491         # TODO: allow a path rather than a cap URI.
492         self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
493
494         if not IDirectoryNode.providedBy(self._collective_dirnode):
495             raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
496         if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
497             raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
498
499         self._turn_delay = self.REMOTE_SCAN_INTERVAL
500         self._download_scan_batch = {} # path -> [(filenode, metadata)]
501
502     def start_scanning(self):
503         self._log("start_scanning")
504         files = self._db.get_all_relpaths()
505         self._log("all files %s" % files)
506
507         d = self._scan_remote_collective()
508         self._turn_deque()
509         return d
510
511     def stop(self):
512         self._stopped = True
513         d = defer.succeed(None)
514         d.addCallback(lambda ign: self._lazy_tail)
515         return d
516
517     def _should_download(self, relpath_u, remote_version):
518         """
519         _should_download returns a bool indicating whether or not a remote object should be downloaded.
520         We check the remote metadata version against our magic-folder db version number;
521         latest version wins.
522         """
523         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
524         if magicpath.should_ignore_file(relpath_u):
525             self._log("nope")
526             return False
527         self._log("yep")
528         v = self._db.get_local_file_version(relpath_u)
529         self._log("v = %r" % (v,))
530         return (v is None or v < remote_version)
531
532     def _get_local_latest(self, relpath_u):
533         """
534         _get_local_latest takes a unicode path string checks to see if this file object
535         exists in our magic-folder db; if not then return None
536         else check for an entry in our magic-folder db and return the version number.
537         """
538         if not self._get_filepath(relpath_u).exists():
539             return None
540         return self._db.get_local_file_version(relpath_u)
541
542     def _get_collective_latest_file(self, filename):
543         """
544         _get_collective_latest_file takes a file path pointing to a file managed by
545         magic-folder and returns a deferred that fires with the two tuple containing a
546         file node and metadata for the latest version of the file located in the
547         magic-folder collective directory.
548         """
549         collective_dirmap_d = self._collective_dirnode.list()
550         def scan_collective(result):
551             list_of_deferreds = []
552             for dir_name in result.keys():
553                 # XXX make sure it's a directory
554                 d = defer.succeed(None)
555                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
556                 list_of_deferreds.append(d)
557             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
558             return deferList
559         collective_dirmap_d.addCallback(scan_collective)
560         def highest_version(deferredList):
561             max_version = 0
562             metadata = None
563             node = None
564             for success, result in deferredList:
565                 if success:
566                     if result[1]['version'] > max_version:
567                         node, metadata = result
568                         max_version = result[1]['version']
569             return node, metadata
570         collective_dirmap_d.addCallback(highest_version)
571         return collective_dirmap_d
572
573     def _append_to_batch(self, name, file_node, metadata):
574         if self._download_scan_batch.has_key(name):
575             self._download_scan_batch[name] += [(file_node, metadata)]
576         else:
577             self._download_scan_batch[name] = [(file_node, metadata)]
578
579     def _scan_remote(self, nickname, dirnode):
580         self._log("_scan_remote nickname %r" % (nickname,))
581         d = dirnode.list()
582         def scan_listing(listing_map):
583             for encoded_relpath_u in listing_map.keys():
584                 relpath_u = magicpath.magic2path(encoded_relpath_u)
585                 self._log("found %r" % (relpath_u,))
586
587                 file_node, metadata = listing_map[encoded_relpath_u]
588                 local_version = self._get_local_latest(relpath_u)
589                 remote_version = metadata.get('version', None)
590                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
591                 if local_version is None or remote_version is None or local_version < remote_version:
592                     self._log("%r added to download queue" % (relpath_u,))
593                     self._append_to_batch(relpath_u, file_node, metadata)
594         d.addCallback(scan_listing)
595         d.addBoth(self._logcb, "end of _scan_remote")
596         return d
597
598     def _scan_remote_collective(self):
599         self._log("_scan_remote_collective")
600         self._download_scan_batch = {} # XXX
601
602         if self._collective_dirnode is None:
603             return
604         collective_dirmap_d = self._collective_dirnode.list()
605         def do_list(result):
606             others = [x for x in result.keys()]
607             return result, others
608         collective_dirmap_d.addCallback(do_list)
609         def scan_collective(result):
610             d = defer.succeed(None)
611             collective_dirmap, others_list = result
612             for dir_name in others_list:
613                 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
614                 # XXX todo add errback
615             return d
616         collective_dirmap_d.addCallback(scan_collective)
617         collective_dirmap_d.addCallback(self._filter_scan_batch)
618         collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
619         return collective_dirmap_d
620
621     def _add_batch_to_download_queue(self, result):
622         self._log("result = %r" % (result,))
623         self._log("deque = %r" % (self._deque,))
624         self._deque.extend(result)
625         self._log("deque after = %r" % (self._deque,))
626         self._count('objects_queued', len(result))
627         self._log("pending = %r" % (self._pending,))
628         self._pending.update(map(lambda x: x[0], result))
629         self._log("pending after = %r" % (self._pending,))
630
631     def _filter_scan_batch(self, result):
632         self._log("_filter_scan_batch")
633         extension = [] # consider whether this should be a dict
634         for relpath_u in self._download_scan_batch.keys():
635             if relpath_u in self._pending:
636                 continue
637             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
638             if self._should_download(relpath_u, metadata['version']):
639                 extension += [(relpath_u, file_node, metadata)]
640             else:
641                 self._log("Excluding %r" % (relpath_u,))
642                 self._count('objects_excluded')
643                 self._call_hook(None, 'processed')
644         return extension
645
646     def _when_queue_is_empty(self):
647         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
648         d.addBoth(self._logcb, "after _scan_remote_collective")
649         d.addCallback(lambda ign: self._turn_deque())
650         return d
651
652     def _process(self, item, now=None):
653         self._log("_process(%r)" % (item,))
654         if now is None:
655             now = time.time()
656         (relpath_u, file_node, metadata) = item
657         fp = self._get_filepath(relpath_u)
658         abspath_u = unicode_from_filepath(fp)
659         conflict_path_u = self._get_conflicted_filename(abspath_u)
660         d = defer.succeed(None)
661
662         def do_update_db(written_abspath_u):
663             filecap = file_node.get_uri()
664             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
665             last_downloaded_uri = filecap
666             last_downloaded_timestamp = now
667             written_pathinfo = get_pathinfo(written_abspath_u)
668
669             if not written_pathinfo.exists and not metadata.get('deleted', False):
670                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
671
672             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
673                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
674             self._count('objects_downloaded')
675         def failed(f):
676             self._log("download failed: %s" % (str(f),))
677             self._count('objects_failed')
678             return f
679
680         if os.path.isfile(conflict_path_u):
681             def fail(res):
682                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
683             d.addCallback(fail)
684         else:
685             is_conflict = False
686             if self._db.check_file_db_exists(relpath_u):
687                 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
688                 local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
689                 print "metadata %r" % (metadata,)
690                 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
691                 if dmd_last_downloaded_uri is not None and local_last_downloaded_uri is not None:
692                     if dmd_last_downloaded_uri != local_last_downloaded_uri:
693                         is_conflict = True
694                         self._count('objects_conflicted')
695
696                 #dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
697                 #local_last_uploaded_uri = ...
698
699             if relpath_u.endswith(u"/"):
700                 self._log("mkdir(%r)" % (abspath_u,))
701                 d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
702                 d.addCallback(lambda ign: abspath_u)
703             else:
704                 d.addCallback(lambda ign: file_node.download_best_version())
705                 if metadata.get('deleted', False):
706                     d.addCallback(lambda result: self._unlink_deleted_file(abspath_u, result))
707                 else:
708                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
709                                                                                is_conflict=is_conflict))
710
711         d.addCallbacks(do_update_db, failed)
712
713         def remove_from_pending(res):
714             self._pending.remove(relpath_u)
715             return res
716         d.addBoth(remove_from_pending)
717         def trap_conflicts(f):
718             f.trap(ConflictError)
719             return None
720         d.addErrback(trap_conflicts)
721         return d
722
723     def _unlink_deleted_file(self, abspath_u, result):
724         try:
725             self._log('unlinking: %s' % (abspath_u,))
726             shutil.move(abspath_u, abspath_u + '.backup')
727         except IOError:
728             self._log("Already gone: '%s'" % (abspath_u,))
729         return abspath_u