]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
e6661c9e44b291a24bb44f4c5c8830e2755c0bb3
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         self.is_ready = False
63
64         self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
65         self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
66
67     def startService(self):
68         # TODO: why is this being called more than once?
69         if self.running:
70             return defer.succeed(None)
71         print "%r.startService" % (self,)
72         service.MultiService.startService(self)
73         return self.uploader.start_monitoring()
74
75     def ready(self):
76         """ready is used to signal us to start
77         processing the upload and download items...
78         """
79         self.is_ready = True
80         d = self.uploader.start_scanning()
81         d2 = self.downloader.start_scanning()
82         d.addCallback(lambda ign: d2)
83         return d
84
85     def finish(self):
86         print "finish"
87         d = self.uploader.stop()
88         d2 = self.downloader.stop()
89         d.addCallback(lambda ign: d2)
90         return d
91
92     def remove_service(self):
93         return service.MultiService.disownServiceParent(self)
94
95
96 class QueueMixin(HookMixin):
97     def __init__(self, client, local_path_u, db, name, clock):
98         self._client = client
99         self._local_path_u = local_path_u
100         self._local_filepath = to_filepath(local_path_u)
101         self._db = db
102         self._name = name
103         self._clock = clock
104         self._hooks = {'processed': None, 'started': None}
105         self.started_d = self.set_hook('started')
106
107         if not self._local_filepath.exists():
108             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
109                                  "but there is no directory at that location."
110                                  % quote_local_unicode_path(self._local_path_u))
111         if not self._local_filepath.isdir():
112             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
113                                  "but the thing at that location is not a directory."
114                                  % quote_local_unicode_path(self._local_path_u))
115
116         self._deque = deque()
117         self._lazy_tail = defer.succeed(None)
118         self._pending = set()
119         self._stopped = False
120         self._turn_delay = 0
121
122     def _get_filepath(self, relpath_u):
123         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
124
125     def _get_relpath(self, filepath):
126         self._log("_get_relpath(%r)" % (filepath,))
127         segments = unicode_segments_from(filepath, self._local_filepath)
128         self._log("segments = %r" % (segments,))
129         return u"/".join(segments)
130
131     def _count(self, counter_name, delta=1):
132         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
133         self._log("%s += %r" % (counter_name, delta))
134         self._client.stats_provider.count(ctr, delta)
135
136     def _logcb(self, res, msg):
137         self._log("%s: %r" % (msg, res))
138         return res
139
140     def _log(self, msg):
141         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
142         self._client.log(s)
143         print s
144         #open("events", "ab+").write(msg)
145
146     def _append_to_deque(self, relpath_u):
147         self._log("_append_to_deque(%r)" % (relpath_u,))
148         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
149             return
150         self._deque.append(relpath_u)
151         self._pending.add(relpath_u)
152         self._count('objects_queued')
153         if self.is_ready:
154             self._clock.callLater(0, self._turn_deque)
155
156     def _turn_deque(self):
157         self._log("_turn_deque")
158         if self._stopped:
159             self._log("stopped")
160             return
161         try:
162             item = self._deque.pop()
163             self._log("popped %r" % (item,))
164             self._count('objects_queued', -1)
165         except IndexError:
166             self._log("deque is now empty")
167             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
168         else:
169             self._lazy_tail.addCallback(lambda ign: self._process(item))
170             self._lazy_tail.addBoth(self._call_hook, 'processed')
171             self._lazy_tail.addErrback(log.err)
172             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
173
174
175 class Uploader(QueueMixin):
176     def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
177         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
178
179         self.is_ready = False
180
181         # TODO: allow a path rather than a cap URI.
182         self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
183         if not IDirectoryNode.providedBy(self._upload_dirnode):
184             raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
185         if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
186             raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
187
188         self._inotify = get_inotify_module()
189         self._notifier = self._inotify.INotify()
190
191         if hasattr(self._notifier, 'set_pending_delay'):
192             self._notifier.set_pending_delay(pending_delay)
193
194         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
195         #
196         self.mask = ( self._inotify.IN_CREATE
197                     | self._inotify.IN_CLOSE_WRITE
198                     | self._inotify.IN_MOVED_TO
199                     | self._inotify.IN_MOVED_FROM
200                     | self._inotify.IN_DELETE
201                     | self._inotify.IN_ONLYDIR
202                     | IN_EXCL_UNLINK
203                     )
204         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
205                              recursive=True)
206
207     def start_monitoring(self):
208         self._log("start_monitoring")
209         d = defer.succeed(None)
210         d.addCallback(lambda ign: self._notifier.startReading())
211         d.addCallback(lambda ign: self._count('dirs_monitored'))
212         d.addBoth(self._call_hook, 'started')
213         return d
214
215     def stop(self):
216         self._log("stop")
217         self._notifier.stopReading()
218         self._count('dirs_monitored', -1)
219         if hasattr(self._notifier, 'wait_until_stopped'):
220             d = self._notifier.wait_until_stopped()
221         else:
222             d = defer.succeed(None)
223         d.addCallback(lambda ign: self._lazy_tail)
224         return d
225
226     def start_scanning(self):
227         self._log("start_scanning")
228         self.is_ready = True
229         self._pending = self._db.get_all_relpaths()
230         self._log("all_files %r" % (self._pending))
231         d = self._scan(u"")
232         def _add_pending(ign):
233             # This adds all of the files that were in the db but not already processed
234             # (normally because they have been deleted on disk).
235             self._log("adding %r" % (self._pending))
236             self._deque.extend(self._pending)
237         d.addCallback(_add_pending)
238         d.addCallback(lambda ign: self._turn_deque())
239         return d
240
241     def _scan(self, reldir_u):
242         self._log("scan %r" % (reldir_u,))
243         fp = self._get_filepath(reldir_u)
244         try:
245             children = listdir_filepath(fp)
246         except EnvironmentError:
247             raise Exception("WARNING: magic folder: permission denied on directory %s"
248                             % quote_filepath(fp))
249         except FilenameEncodingError:
250             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
251                             % quote_filepath(fp))
252
253         d = defer.succeed(None)
254         for child in children:
255             assert isinstance(child, unicode), child
256             d.addCallback(lambda ign, child=child:
257                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
258             def _add_pending(relpath_u):
259                 if magicpath.should_ignore_file(relpath_u):
260                     return None
261
262                 self._pending.add(relpath_u)
263                 return relpath_u
264             d.addCallback(_add_pending)
265             # This call to _process doesn't go through the deque, and probably should.
266             d.addCallback(self._process)
267             d.addBoth(self._call_hook, 'processed')
268             d.addErrback(log.err)
269
270         return d
271
272     def _notify(self, opaque, path, events_mask):
273         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
274
275         # We filter out IN_CREATE events not associated with a directory.
276         # Acting on IN_CREATE for files could cause us to read and upload
277         # a possibly-incomplete file before the application has closed it.
278         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
279         # It isn't possible to avoid watching for IN_CREATE at all, because
280         # it is the only event notified for a directory creation.
281
282         if ((events_mask & self._inotify.IN_CREATE) != 0 and
283             (events_mask & self._inotify.IN_ISDIR) == 0):
284             self._log("ignoring inotify event for creation of file %r\n" % (path,))
285             return
286
287         relpath_u = self._get_relpath(path)
288         self._append_to_deque(relpath_u)
289
290     def _when_queue_is_empty(self):
291         return defer.succeed(None)
292
293     def _process(self, relpath_u):
294         self._log("_process(%r)" % (relpath_u,))
295         if relpath_u is None:
296             return
297         precondition(isinstance(relpath_u, unicode), relpath_u)
298
299         d = defer.succeed(None)
300
301         def _maybe_upload(val, now=None):
302             if now is None:
303                 now = time.time()
304             fp = self._get_filepath(relpath_u)
305             pathinfo = get_pathinfo(unicode_from_filepath(fp))
306
307             self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
308             self._pending.remove(relpath_u)
309             encoded_path_u = magicpath.path2magic(relpath_u)
310
311             if not pathinfo.exists:
312                 # FIXME merge this with the 'isfile' case.
313                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
314                 self._count('objects_disappeared')
315                 if not self._db.check_file_db_exists(relpath_u):
316                     return None
317
318                 last_downloaded_timestamp = now
319                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
320
321                 current_version = self._db.get_local_file_version(relpath_u)
322                 if current_version is None:
323                     new_version = 0
324                 elif self._db.is_new_file(pathinfo, relpath_u):
325                     new_version = current_version + 1
326                 else:
327                     self._log("Not uploading %r" % (relpath_u,))
328                     self._count('objects_not_uploaded')
329                     return
330
331                 metadata = { 'version': new_version,
332                              'deleted': True,
333                              'last_downloaded_timestamp': last_downloaded_timestamp }
334                 if last_downloaded_uri is not None:
335                     metadata['last_downloaded_uri'] = last_downloaded_uri
336
337                 empty_uploadable = Data("", self._client.convergence)
338                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
339                                                    metadata=metadata, overwrite=True)
340
341                 def _add_db_entry(filenode):
342                     filecap = filenode.get_uri()
343                     self._db.did_upload_version(relpath_u, new_version, filecap,
344                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
345                     self._count('files_uploaded')
346                 d2.addCallback(_add_db_entry)
347                 return d2
348             elif pathinfo.islink:
349                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
350                 return None
351             elif pathinfo.isdir:
352                 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
353                 uploadable = Data("", self._client.convergence)
354                 encoded_path_u += magicpath.path2magic(u"/")
355                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
356                 def _succeeded(ign):
357                     self._log("created subdirectory %r" % (relpath_u,))
358                     self._count('directories_created')
359                 def _failed(f):
360                     self._log("failed to create subdirectory %r" % (relpath_u,))
361                     return f
362                 upload_d.addCallbacks(_succeeded, _failed)
363                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
364                 return upload_d
365             elif pathinfo.isfile:
366                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
367                 last_downloaded_timestamp = now
368
369                 current_version = self._db.get_local_file_version(relpath_u)
370                 if current_version is None:
371                     new_version = 0
372                 elif self._db.is_new_file(pathinfo, relpath_u):
373                     new_version = current_version + 1
374                 else:
375                     self._log("Not uploading %r" % (relpath_u,))
376                     self._count('objects_not_uploaded')
377                     return None
378
379                 metadata = { 'version': new_version,
380                              'last_downloaded_timestamp': last_downloaded_timestamp }
381                 if last_downloaded_uri is not None:
382                     metadata['last_downloaded_uri'] = last_downloaded_uri
383
384                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
385                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
386                                                    metadata=metadata, overwrite=True)
387
388                 def _add_db_entry(filenode):
389                     filecap = filenode.get_uri()
390                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
391                     self._db.did_upload_version(relpath_u, new_version, filecap,
392                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
393                     self._count('files_uploaded')
394                 d2.addCallback(_add_db_entry)
395                 return d2
396             else:
397                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
398                 return None
399
400         d.addCallback(_maybe_upload)
401
402         def _succeeded(res):
403             self._count('objects_succeeded')
404             return res
405         def _failed(f):
406             self._count('objects_failed')
407             self._log("%r while processing %r" % (f, relpath_u))
408             return f
409         d.addCallbacks(_succeeded, _failed)
410         return d
411
412     def _get_metadata(self, encoded_path_u):
413         try:
414             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
415         except KeyError:
416             return Failure()
417         return d
418
419     def _get_filenode(self, encoded_path_u):
420         try:
421             d = self._upload_dirnode.get(encoded_path_u)
422         except KeyError:
423             return Failure()
424         return d
425
426
427 class WriteFileMixin(object):
428     FUDGE_SECONDS = 10.0
429
430     def _get_conflicted_filename(self, abspath_u):
431         return abspath_u + u".conflict"
432
433     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
434         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
435                   % (abspath_u, len(file_contents), is_conflict, now))
436
437         # 1. Write a temporary file, say .foo.tmp.
438         # 2. is_conflict determines whether this is an overwrite or a conflict.
439         # 3. Set the mtime of the replacement file to be T seconds before the
440         #    current local time.
441         # 4. Perform a file replacement with backup filename foo.backup,
442         #    replaced file foo, and replacement file .foo.tmp. If any step of
443         #    this operation fails, reclassify as a conflict and stop.
444         #
445         # Returns the path of the destination file.
446
447         precondition_abspath(abspath_u)
448         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
449         backup_path_u = abspath_u + u".backup"
450         if now is None:
451             now = time.time()
452
453         # ensure parent directory exists
454         head, tail = os.path.split(abspath_u)
455         mode = 0777 # XXX
456         fileutil.make_dirs(head, mode)
457
458         fileutil.write(replacement_path_u, file_contents)
459         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
460         if is_conflict:
461             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
462             return self._rename_conflicted_file(abspath_u, replacement_path_u)
463         else:
464             try:
465                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
466                 return abspath_u
467             except fileutil.ConflictError:
468                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
469
470     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
471         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
472
473         conflict_path_u = self._get_conflicted_filename(abspath_u)
474         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
475         if os.path.isfile(replacement_path_u):
476             print "%r exists" % (replacement_path_u,)
477         if os.path.isfile(conflict_path_u):
478             print "%r exists" % (conflict_path_u,)
479
480         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
481         return conflict_path_u
482
483     def _rename_deleted_file(self, abspath_u):
484         self._log('renaming deleted file to backup: %s' % (abspath_u,))
485         try:
486             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
487         except IOError:
488             # XXX is this the correct error?
489             self._log("Already gone: '%s'" % (abspath_u,))
490         return abspath_u
491
492
493 class Downloader(QueueMixin, WriteFileMixin):
494     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
495
496     def __init__(self, client, local_path_u, db, collective_dircap, clock):
497         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
498
499         # TODO: allow a path rather than a cap URI.
500         self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
501
502         if not IDirectoryNode.providedBy(self._collective_dirnode):
503             raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
504         if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
505             raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
506
507         self._turn_delay = self.REMOTE_SCAN_INTERVAL
508         self._download_scan_batch = {} # path -> [(filenode, metadata)]
509
510     def start_scanning(self):
511         self._log("start_scanning")
512         files = self._db.get_all_relpaths()
513         self._log("all files %s" % files)
514
515         d = self._scan_remote_collective()
516         self._turn_deque()
517         return d
518
519     def stop(self):
520         self._stopped = True
521         d = defer.succeed(None)
522         d.addCallback(lambda ign: self._lazy_tail)
523         return d
524
525     def _should_download(self, relpath_u, remote_version):
526         """
527         _should_download returns a bool indicating whether or not a remote object should be downloaded.
528         We check the remote metadata version against our magic-folder db version number;
529         latest version wins.
530         """
531         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
532         if magicpath.should_ignore_file(relpath_u):
533             self._log("nope")
534             return False
535         self._log("yep")
536         v = self._db.get_local_file_version(relpath_u)
537         self._log("v = %r" % (v,))
538         return (v is None or v < remote_version)
539
540     def _get_local_latest(self, relpath_u):
541         """
542         _get_local_latest takes a unicode path string checks to see if this file object
543         exists in our magic-folder db; if not then return None
544         else check for an entry in our magic-folder db and return the version number.
545         """
546         if not self._get_filepath(relpath_u).exists():
547             return None
548         return self._db.get_local_file_version(relpath_u)
549
550     def _get_collective_latest_file(self, filename):
551         """
552         _get_collective_latest_file takes a file path pointing to a file managed by
553         magic-folder and returns a deferred that fires with the two tuple containing a
554         file node and metadata for the latest version of the file located in the
555         magic-folder collective directory.
556         """
557         collective_dirmap_d = self._collective_dirnode.list()
558         def scan_collective(result):
559             list_of_deferreds = []
560             for dir_name in result.keys():
561                 # XXX make sure it's a directory
562                 d = defer.succeed(None)
563                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
564                 list_of_deferreds.append(d)
565             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
566             return deferList
567         collective_dirmap_d.addCallback(scan_collective)
568         def highest_version(deferredList):
569             max_version = 0
570             metadata = None
571             node = None
572             for success, result in deferredList:
573                 if success:
574                     if result[1]['version'] > max_version:
575                         node, metadata = result
576                         max_version = result[1]['version']
577             return node, metadata
578         collective_dirmap_d.addCallback(highest_version)
579         return collective_dirmap_d
580
581     def _append_to_batch(self, name, file_node, metadata):
582         if self._download_scan_batch.has_key(name):
583             self._download_scan_batch[name] += [(file_node, metadata)]
584         else:
585             self._download_scan_batch[name] = [(file_node, metadata)]
586
587     def _scan_remote(self, nickname, dirnode):
588         self._log("_scan_remote nickname %r" % (nickname,))
589         d = dirnode.list()
590         def scan_listing(listing_map):
591             for encoded_relpath_u in listing_map.keys():
592                 relpath_u = magicpath.magic2path(encoded_relpath_u)
593                 self._log("found %r" % (relpath_u,))
594
595                 file_node, metadata = listing_map[encoded_relpath_u]
596                 local_version = self._get_local_latest(relpath_u)
597                 remote_version = metadata.get('version', None)
598                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
599                 if local_version is None or remote_version is None or local_version < remote_version:
600                     self._log("%r added to download queue" % (relpath_u,))
601                     self._append_to_batch(relpath_u, file_node, metadata)
602         d.addCallback(scan_listing)
603         d.addBoth(self._logcb, "end of _scan_remote")
604         return d
605
606     def _scan_remote_collective(self):
607         self._log("_scan_remote_collective")
608         self._download_scan_batch = {} # XXX
609
610         if self._collective_dirnode is None:
611             return
612         collective_dirmap_d = self._collective_dirnode.list()
613         def do_list(result):
614             others = [x for x in result.keys()]
615             return result, others
616         collective_dirmap_d.addCallback(do_list)
617         def scan_collective(result):
618             d = defer.succeed(None)
619             collective_dirmap, others_list = result
620             for dir_name in others_list:
621                 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
622                 # XXX todo add errback
623             return d
624         collective_dirmap_d.addCallback(scan_collective)
625         collective_dirmap_d.addCallback(self._filter_scan_batch)
626         collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
627         return collective_dirmap_d
628
629     def _add_batch_to_download_queue(self, result):
630         self._log("result = %r" % (result,))
631         self._log("deque = %r" % (self._deque,))
632         self._deque.extend(result)
633         self._log("deque after = %r" % (self._deque,))
634         self._count('objects_queued', len(result))
635         self._log("pending = %r" % (self._pending,))
636         self._pending.update(map(lambda x: x[0], result))
637         self._log("pending after = %r" % (self._pending,))
638
639     def _filter_scan_batch(self, result):
640         self._log("_filter_scan_batch")
641         extension = [] # consider whether this should be a dict
642         for relpath_u in self._download_scan_batch.keys():
643             if relpath_u in self._pending:
644                 continue
645             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
646             if self._should_download(relpath_u, metadata['version']):
647                 extension += [(relpath_u, file_node, metadata)]
648             else:
649                 self._log("Excluding %r" % (relpath_u,))
650                 self._count('objects_excluded')
651                 self._call_hook(None, 'processed')
652         return extension
653
654     def _when_queue_is_empty(self):
655         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
656         d.addBoth(self._logcb, "after _scan_remote_collective")
657         d.addCallback(lambda ign: self._turn_deque())
658         return d
659
660     def _process(self, item, now=None):
661         self._log("_process(%r)" % (item,))
662         if now is None:
663             now = time.time()
664         (relpath_u, file_node, metadata) = item
665         fp = self._get_filepath(relpath_u)
666         abspath_u = unicode_from_filepath(fp)
667         conflict_path_u = self._get_conflicted_filename(abspath_u)
668         d = defer.succeed(None)
669
670         def do_update_db(written_abspath_u):
671             filecap = file_node.get_uri()
672             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
673             last_downloaded_uri = filecap
674             last_downloaded_timestamp = now
675             written_pathinfo = get_pathinfo(written_abspath_u)
676
677             if not written_pathinfo.exists and not metadata.get('deleted', False):
678                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
679
680             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
681                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
682             self._count('objects_downloaded')
683         def failed(f):
684             self._log("download failed: %s" % (str(f),))
685             self._count('objects_failed')
686             return f
687
688         if os.path.isfile(conflict_path_u):
689             def fail(res):
690                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
691             d.addCallback(fail)
692         else:
693             is_conflict = False
694             if self._db.check_file_db_exists(relpath_u):
695                 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
696                 local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
697                 print "metadata %r" % (metadata,)
698                 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
699                 if dmd_last_downloaded_uri is not None and local_last_downloaded_uri is not None:
700                     if dmd_last_downloaded_uri != local_last_downloaded_uri:
701                         is_conflict = True
702                         self._count('objects_conflicted')
703
704                 #dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
705                 #local_last_uploaded_uri = ...
706
707             if relpath_u.endswith(u"/"):
708                 self._log("mkdir(%r)" % (abspath_u,))
709                 d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
710                 d.addCallback(lambda ign: abspath_u)
711             else:
712                 d.addCallback(lambda ign: file_node.download_best_version())
713                 if metadata.get('deleted', False):
714                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
715                 else:
716                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
717                                                                                is_conflict=is_conflict))
718
719         d.addCallbacks(do_update_db, failed)
720
721         def remove_from_pending(res):
722             self._pending.remove(relpath_u)
723             return res
724         d.addBoth(remove_from_pending)
725         def trap_conflicts(f):
726             f.trap(ConflictError)
727             return None
728         d.addErrback(trap_conflicts)
729         return d