]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Don't add subdirectory watches if the platform's notifier doesn't require them. refs...
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=reactor):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
55         if db is None:
56             return Failure(Exception('ERROR: Unable to load magic folder db.'))
57
58         # for tests
59         self._client = client
60         self._db = db
61
62         self.is_ready = False
63
64         self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
65         self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
66
67     def startService(self):
68         # TODO: why is this being called more than once?
69         if self.running:
70             return defer.succeed(None)
71         print "%r.startService" % (self,)
72         service.MultiService.startService(self)
73         return self.uploader.start_monitoring()
74
75     def ready(self):
76         """ready is used to signal us to start
77         processing the upload and download items...
78         """
79         self.is_ready = True
80         d = self.uploader.start_scanning()
81         d2 = self.downloader.start_scanning()
82         d.addCallback(lambda ign: d2)
83         return d
84
85     def finish(self):
86         print "finish"
87         d = self.uploader.stop()
88         d2 = self.downloader.stop()
89         d.addCallback(lambda ign: d2)
90         return d
91
92     def remove_service(self):
93         return service.MultiService.disownServiceParent(self)
94
95
96 class QueueMixin(HookMixin):
97     def __init__(self, client, local_path_u, db, name, clock):
98         self._client = client
99         self._local_path_u = local_path_u
100         self._local_filepath = to_filepath(local_path_u)
101         self._db = db
102         self._name = name
103         self._clock = clock
104         self._hooks = {'processed': None, 'started': None}
105         self.started_d = self.set_hook('started')
106
107         if not self._local_filepath.exists():
108             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
109                                  "but there is no directory at that location."
110                                  % quote_local_unicode_path(self._local_path_u))
111         if not self._local_filepath.isdir():
112             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
113                                  "but the thing at that location is not a directory."
114                                  % quote_local_unicode_path(self._local_path_u))
115
116         self._deque = deque()
117         self._lazy_tail = defer.succeed(None)
118         self._pending = set()
119         self._stopped = False
120         self._turn_delay = 0
121
122     def _get_filepath(self, relpath_u):
123         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
124
125     def _get_relpath(self, filepath):
126         self._log("_get_relpath(%r)" % (filepath,))
127         segments = unicode_segments_from(filepath, self._local_filepath)
128         self._log("segments = %r" % (segments,))
129         return u"/".join(segments)
130
131     def _count(self, counter_name, delta=1):
132         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
133         self._log("%s += %r" % (counter_name, delta))
134         self._client.stats_provider.count(ctr, delta)
135
136     def _logcb(self, res, msg):
137         self._log("%s: %r" % (msg, res))
138         return res
139
140     def _log(self, msg):
141         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
142         self._client.log(s)
143         print s
144         #open("events", "ab+").write(msg)
145
146     def _append_to_deque(self, relpath_u):
147         self._log("_append_to_deque(%r)" % (relpath_u,))
148         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
149             return
150         self._deque.append(relpath_u)
151         self._pending.add(relpath_u)
152         self._count('objects_queued')
153         if self.is_ready:
154             self._clock.callLater(0, self._turn_deque)
155
156     def _turn_deque(self):
157         self._log("_turn_deque")
158         if self._stopped:
159             self._log("stopped")
160             return
161         try:
162             item = self._deque.pop()
163             self._log("popped %r" % (item,))
164             self._count('objects_queued', -1)
165         except IndexError:
166             self._log("deque is now empty")
167             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
168         else:
169             self._lazy_tail.addCallback(lambda ign: self._process(item))
170             self._lazy_tail.addBoth(self._call_hook, 'processed')
171             self._lazy_tail.addErrback(log.err)
172             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
173
174
175 class Uploader(QueueMixin):
176     def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
177         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
178
179         self.is_ready = False
180
181         # TODO: allow a path rather than a cap URI.
182         self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
183         if not IDirectoryNode.providedBy(self._upload_dirnode):
184             raise AssertionError("The URI in '%s' does not refer to a directory."
185                                  % os.path.join('private', 'magic_folder_dircap'))
186         if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
187             raise AssertionError("The URI in '%s' is not a writecap to a directory."
188                                  % os.path.join('private', 'magic_folder_dircap'))
189
190         self._inotify = get_inotify_module()
191         self._notifier = self._inotify.INotify()
192
193         if hasattr(self._notifier, 'set_pending_delay'):
194             self._notifier.set_pending_delay(pending_delay)
195
196         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
197         #
198         self.mask = ( self._inotify.IN_CREATE
199                     | self._inotify.IN_CLOSE_WRITE
200                     | self._inotify.IN_MOVED_TO
201                     | self._inotify.IN_MOVED_FROM
202                     | self._inotify.IN_DELETE
203                     | self._inotify.IN_ONLYDIR
204                     | IN_EXCL_UNLINK
205                     )
206         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
207                              recursive=True)
208
209     def start_monitoring(self):
210         self._log("start_monitoring")
211         d = defer.succeed(None)
212         d.addCallback(lambda ign: self._notifier.startReading())
213         d.addCallback(lambda ign: self._count('dirs_monitored'))
214         d.addBoth(self._call_hook, 'started')
215         return d
216
217     def stop(self):
218         self._log("stop")
219         self._notifier.stopReading()
220         self._count('dirs_monitored', -1)
221         if hasattr(self._notifier, 'wait_until_stopped'):
222             d = self._notifier.wait_until_stopped()
223         else:
224             d = defer.succeed(None)
225         d.addCallback(lambda ign: self._lazy_tail)
226         return d
227
228     def start_scanning(self):
229         self._log("start_scanning")
230         self.is_ready = True
231         self._pending = self._db.get_all_relpaths()
232         self._log("all_files %r" % (self._pending))
233         d = self._scan(u"")
234         def _add_pending(ign):
235             # This adds all of the files that were in the db but not already processed
236             # (normally because they have been deleted on disk).
237             self._log("adding %r" % (self._pending))
238             self._deque.extend(self._pending)
239         d.addCallback(_add_pending)
240         d.addCallback(lambda ign: self._turn_deque())
241         return d
242
243     def _scan(self, reldir_u):
244         self._log("scan %r" % (reldir_u,))
245         fp = self._get_filepath(reldir_u)
246         try:
247             children = listdir_filepath(fp)
248         except EnvironmentError:
249             raise Exception("WARNING: magic folder: permission denied on directory %s"
250                             % quote_filepath(fp))
251         except FilenameEncodingError:
252             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
253                             % quote_filepath(fp))
254
255         d = defer.succeed(None)
256         for child in children:
257             _assert(isinstance(child, unicode), child=child)
258             d.addCallback(lambda ign, child=child:
259                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
260             def _add_pending(relpath_u):
261                 if magicpath.should_ignore_file(relpath_u):
262                     return None
263
264                 self._pending.add(relpath_u)
265                 return relpath_u
266             d.addCallback(_add_pending)
267             # This call to _process doesn't go through the deque, and probably should.
268             d.addCallback(self._process)
269             d.addBoth(self._call_hook, 'processed')
270             d.addErrback(log.err)
271
272         return d
273
274     def _notify(self, opaque, path, events_mask):
275         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
276
277         # We filter out IN_CREATE events not associated with a directory.
278         # Acting on IN_CREATE for files could cause us to read and upload
279         # a possibly-incomplete file before the application has closed it.
280         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
281         # It isn't possible to avoid watching for IN_CREATE at all, because
282         # it is the only event notified for a directory creation.
283
284         if ((events_mask & self._inotify.IN_CREATE) != 0 and
285             (events_mask & self._inotify.IN_ISDIR) == 0):
286             self._log("ignoring inotify event for creation of file %r\n" % (path,))
287             return
288
289         relpath_u = self._get_relpath(path)
290         self._append_to_deque(relpath_u)
291
292     def _when_queue_is_empty(self):
293         return defer.succeed(None)
294
295     def _process(self, relpath_u):
296         self._log("_process(%r)" % (relpath_u,))
297         if relpath_u is None:
298             return
299         precondition(isinstance(relpath_u, unicode), relpath_u)
300
301         d = defer.succeed(None)
302
303         def _maybe_upload(val, now=None):
304             if now is None:
305                 now = time.time()
306             fp = self._get_filepath(relpath_u)
307             pathinfo = get_pathinfo(unicode_from_filepath(fp))
308
309             self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
310             self._pending.remove(relpath_u)
311             encoded_path_u = magicpath.path2magic(relpath_u)
312
313             if not pathinfo.exists:
314                 # FIXME merge this with the 'isfile' case.
315                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
316                 self._count('objects_disappeared')
317                 if not self._db.check_file_db_exists(relpath_u):
318                     return None
319
320                 last_downloaded_timestamp = now
321                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
322
323                 current_version = self._db.get_local_file_version(relpath_u)
324                 if current_version is None:
325                     new_version = 0
326                 elif self._db.is_new_file(pathinfo, relpath_u):
327                     new_version = current_version + 1
328                 else:
329                     self._log("Not uploading %r" % (relpath_u,))
330                     self._count('objects_not_uploaded')
331                     return
332
333                 metadata = { 'version': new_version,
334                              'deleted': True,
335                              'last_downloaded_timestamp': last_downloaded_timestamp }
336                 if last_downloaded_uri is not None:
337                     metadata['last_downloaded_uri'] = last_downloaded_uri
338
339                 empty_uploadable = Data("", self._client.convergence)
340                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
341                                                    metadata=metadata, overwrite=True)
342
343                 def _add_db_entry(filenode):
344                     filecap = filenode.get_uri()
345                     self._db.did_upload_version(relpath_u, new_version, filecap,
346                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
347                     self._count('files_uploaded')
348                 d2.addCallback(_add_db_entry)
349                 return d2
350             elif pathinfo.islink:
351                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
352                 return None
353             elif pathinfo.isdir:
354                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
355                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
356
357                 uploadable = Data("", self._client.convergence)
358                 encoded_path_u += magicpath.path2magic(u"/")
359                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
360                 def _succeeded(ign):
361                     self._log("created subdirectory %r" % (relpath_u,))
362                     self._count('directories_created')
363                 def _failed(f):
364                     self._log("failed to create subdirectory %r" % (relpath_u,))
365                     return f
366                 upload_d.addCallbacks(_succeeded, _failed)
367                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
368                 return upload_d
369             elif pathinfo.isfile:
370                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
371                 last_downloaded_timestamp = now
372
373                 current_version = self._db.get_local_file_version(relpath_u)
374                 if current_version is None:
375                     new_version = 0
376                 elif self._db.is_new_file(pathinfo, relpath_u):
377                     new_version = current_version + 1
378                 else:
379                     self._log("Not uploading %r" % (relpath_u,))
380                     self._count('objects_not_uploaded')
381                     return None
382
383                 metadata = { 'version': new_version,
384                              'last_downloaded_timestamp': last_downloaded_timestamp }
385                 if last_downloaded_uri is not None:
386                     metadata['last_downloaded_uri'] = last_downloaded_uri
387
388                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
389                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
390                                                    metadata=metadata, overwrite=True)
391
392                 def _add_db_entry(filenode):
393                     filecap = filenode.get_uri()
394                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
395                     self._db.did_upload_version(relpath_u, new_version, filecap,
396                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
397                     self._count('files_uploaded')
398                 d2.addCallback(_add_db_entry)
399                 return d2
400             else:
401                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
402                 return None
403
404         d.addCallback(_maybe_upload)
405
406         def _succeeded(res):
407             self._count('objects_succeeded')
408             return res
409         def _failed(f):
410             self._count('objects_failed')
411             self._log("%s while processing %r" % (f, relpath_u))
412             return f
413         d.addCallbacks(_succeeded, _failed)
414         return d
415
416     def _get_metadata(self, encoded_path_u):
417         try:
418             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
419         except KeyError:
420             return Failure()
421         return d
422
423     def _get_filenode(self, encoded_path_u):
424         try:
425             d = self._upload_dirnode.get(encoded_path_u)
426         except KeyError:
427             return Failure()
428         return d
429
430
431 class WriteFileMixin(object):
432     FUDGE_SECONDS = 10.0
433
434     def _get_conflicted_filename(self, abspath_u):
435         return abspath_u + u".conflict"
436
437     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
438         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
439                   % (abspath_u, len(file_contents), is_conflict, now))
440
441         # 1. Write a temporary file, say .foo.tmp.
442         # 2. is_conflict determines whether this is an overwrite or a conflict.
443         # 3. Set the mtime of the replacement file to be T seconds before the
444         #    current local time.
445         # 4. Perform a file replacement with backup filename foo.backup,
446         #    replaced file foo, and replacement file .foo.tmp. If any step of
447         #    this operation fails, reclassify as a conflict and stop.
448         #
449         # Returns the path of the destination file.
450
451         precondition_abspath(abspath_u)
452         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
453         backup_path_u = abspath_u + u".backup"
454         if now is None:
455             now = time.time()
456
457         # ensure parent directory exists
458         head, tail = os.path.split(abspath_u)
459         mode = 0777 # XXX
460         fileutil.make_dirs(head, mode)
461
462         fileutil.write(replacement_path_u, file_contents)
463         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
464         if is_conflict:
465             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
466             return self._rename_conflicted_file(abspath_u, replacement_path_u)
467         else:
468             try:
469                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
470                 return abspath_u
471             except fileutil.ConflictError:
472                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
473
474     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
475         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
476
477         conflict_path_u = self._get_conflicted_filename(abspath_u)
478         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
479         if os.path.isfile(replacement_path_u):
480             print "%r exists" % (replacement_path_u,)
481         if os.path.isfile(conflict_path_u):
482             print "%r exists" % (conflict_path_u,)
483
484         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
485         return conflict_path_u
486
487     def _rename_deleted_file(self, abspath_u):
488         self._log('renaming deleted file to backup: %s' % (abspath_u,))
489         try:
490             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
491         except IOError:
492             # XXX is this the correct error?
493             self._log("Already gone: '%s'" % (abspath_u,))
494         return abspath_u
495
496
497 class Downloader(QueueMixin, WriteFileMixin):
498     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
499
500     def __init__(self, client, local_path_u, db, collective_dircap, clock):
501         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
502
503         # TODO: allow a path rather than a cap URI.
504         self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
505
506         if not IDirectoryNode.providedBy(self._collective_dirnode):
507             raise AssertionError("The URI in '%s' does not refer to a directory."
508                                  % os.path.join('private', 'collective_dircap'))
509         if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
510             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
511                                  % os.path.join('private', 'collective_dircap'))
512
513         self._turn_delay = self.REMOTE_SCAN_INTERVAL
514         self._download_scan_batch = {} # path -> [(filenode, metadata)]
515
516     def start_scanning(self):
517         self._log("start_scanning")
518         files = self._db.get_all_relpaths()
519         self._log("all files %s" % files)
520
521         d = self._scan_remote_collective()
522         self._turn_deque()
523         return d
524
525     def stop(self):
526         self._stopped = True
527         d = defer.succeed(None)
528         d.addCallback(lambda ign: self._lazy_tail)
529         return d
530
531     def _should_download(self, relpath_u, remote_version):
532         """
533         _should_download returns a bool indicating whether or not a remote object should be downloaded.
534         We check the remote metadata version against our magic-folder db version number;
535         latest version wins.
536         """
537         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
538         if magicpath.should_ignore_file(relpath_u):
539             self._log("nope")
540             return False
541         self._log("yep")
542         v = self._db.get_local_file_version(relpath_u)
543         self._log("v = %r" % (v,))
544         return (v is None or v < remote_version)
545
546     def _get_local_latest(self, relpath_u):
547         """
548         _get_local_latest takes a unicode path string checks to see if this file object
549         exists in our magic-folder db; if not then return None
550         else check for an entry in our magic-folder db and return the version number.
551         """
552         if not self._get_filepath(relpath_u).exists():
553             return None
554         return self._db.get_local_file_version(relpath_u)
555
556     def _get_collective_latest_file(self, filename):
557         """
558         _get_collective_latest_file takes a file path pointing to a file managed by
559         magic-folder and returns a deferred that fires with the two tuple containing a
560         file node and metadata for the latest version of the file located in the
561         magic-folder collective directory.
562         """
563         collective_dirmap_d = self._collective_dirnode.list()
564         def scan_collective(result):
565             list_of_deferreds = []
566             for dir_name in result.keys():
567                 # XXX make sure it's a directory
568                 d = defer.succeed(None)
569                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
570                 list_of_deferreds.append(d)
571             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
572             return deferList
573         collective_dirmap_d.addCallback(scan_collective)
574         def highest_version(deferredList):
575             max_version = 0
576             metadata = None
577             node = None
578             for success, result in deferredList:
579                 if success:
580                     if result[1]['version'] > max_version:
581                         node, metadata = result
582                         max_version = result[1]['version']
583             return node, metadata
584         collective_dirmap_d.addCallback(highest_version)
585         return collective_dirmap_d
586
587     def _append_to_batch(self, name, file_node, metadata):
588         if self._download_scan_batch.has_key(name):
589             self._download_scan_batch[name] += [(file_node, metadata)]
590         else:
591             self._download_scan_batch[name] = [(file_node, metadata)]
592
593     def _scan_remote(self, nickname, dirnode):
594         self._log("_scan_remote nickname %r" % (nickname,))
595         d = dirnode.list()
596         def scan_listing(listing_map):
597             for encoded_relpath_u in listing_map.keys():
598                 relpath_u = magicpath.magic2path(encoded_relpath_u)
599                 self._log("found %r" % (relpath_u,))
600
601                 file_node, metadata = listing_map[encoded_relpath_u]
602                 local_version = self._get_local_latest(relpath_u)
603                 remote_version = metadata.get('version', None)
604                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
605                 if local_version is None or remote_version is None or local_version < remote_version:
606                     self._log("%r added to download queue" % (relpath_u,))
607                     self._append_to_batch(relpath_u, file_node, metadata)
608         d.addCallback(scan_listing)
609         d.addBoth(self._logcb, "end of _scan_remote")
610         return d
611
612     def _scan_remote_collective(self):
613         self._log("_scan_remote_collective")
614         self._download_scan_batch = {} # XXX
615
616         if self._collective_dirnode is None:
617             return
618         collective_dirmap_d = self._collective_dirnode.list()
619         def do_list(result):
620             others = [x for x in result.keys()]
621             return result, others
622         collective_dirmap_d.addCallback(do_list)
623         def scan_collective(result):
624             d = defer.succeed(None)
625             collective_dirmap, others_list = result
626             for dir_name in others_list:
627                 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
628                 # XXX todo add errback
629             return d
630         collective_dirmap_d.addCallback(scan_collective)
631         collective_dirmap_d.addCallback(self._filter_scan_batch)
632         collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
633         return collective_dirmap_d
634
635     def _add_batch_to_download_queue(self, result):
636         self._log("result = %r" % (result,))
637         self._log("deque = %r" % (self._deque,))
638         self._deque.extend(result)
639         self._log("deque after = %r" % (self._deque,))
640         self._count('objects_queued', len(result))
641         self._log("pending = %r" % (self._pending,))
642         self._pending.update(map(lambda x: x[0], result))
643         self._log("pending after = %r" % (self._pending,))
644
645     def _filter_scan_batch(self, result):
646         self._log("_filter_scan_batch")
647         extension = [] # consider whether this should be a dict
648         for relpath_u in self._download_scan_batch.keys():
649             if relpath_u in self._pending:
650                 continue
651             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
652             if self._should_download(relpath_u, metadata['version']):
653                 extension += [(relpath_u, file_node, metadata)]
654             else:
655                 self._log("Excluding %r" % (relpath_u,))
656                 self._count('objects_excluded')
657                 self._call_hook(None, 'processed')
658         return extension
659
660     def _when_queue_is_empty(self):
661         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
662         d.addBoth(self._logcb, "after _scan_remote_collective")
663         d.addCallback(lambda ign: self._turn_deque())
664         return d
665
666     def _process(self, item, now=None):
667         self._log("_process(%r)" % (item,))
668         if now is None:
669             now = time.time()
670         (relpath_u, file_node, metadata) = item
671         fp = self._get_filepath(relpath_u)
672         abspath_u = unicode_from_filepath(fp)
673         conflict_path_u = self._get_conflicted_filename(abspath_u)
674         d = defer.succeed(None)
675
676         def do_update_db(written_abspath_u):
677             filecap = file_node.get_uri()
678             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
679             last_downloaded_uri = filecap
680             last_downloaded_timestamp = now
681             written_pathinfo = get_pathinfo(written_abspath_u)
682
683             if not written_pathinfo.exists and not metadata.get('deleted', False):
684                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
685
686             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
687                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
688             self._count('objects_downloaded')
689         def failed(f):
690             self._log("download failed: %s" % (str(f),))
691             self._count('objects_failed')
692             return f
693
694         if os.path.isfile(conflict_path_u):
695             def fail(res):
696                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
697             d.addCallback(fail)
698         else:
699             is_conflict = False
700             if self._db.check_file_db_exists(relpath_u):
701                 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
702                 local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
703                 print "metadata %r" % (metadata,)
704                 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
705                 if dmd_last_downloaded_uri is not None and local_last_downloaded_uri is not None:
706                     if dmd_last_downloaded_uri != local_last_downloaded_uri:
707                         is_conflict = True
708                         self._count('objects_conflicted')
709
710                 #dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
711                 #local_last_uploaded_uri = ...
712
713             if relpath_u.endswith(u"/"):
714                 if metadata.get('deleted', False):
715                     self._log("rmdir(%r) ignored" % (abspath_u,))
716                 else:
717                     self._log("mkdir(%r)" % (abspath_u,))
718                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
719                     d.addCallback(lambda ign: abspath_u)
720             else:
721                 if metadata.get('deleted', False):
722                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
723                 else:
724                     d.addCallback(lambda ign: file_node.download_best_version())
725                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
726                                                                                is_conflict=is_conflict))
727
728         d.addCallbacks(do_update_db, failed)
729
730         def remove_from_pending(res):
731             self._pending.remove(relpath_u)
732             return res
733         d.addBoth(remove_from_pending)
734         def trap_conflicts(f):
735             f.trap(ConflictError)
736             return None
737         d.addErrback(trap_conflicts)
738         return d