]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
51750888cdd0b37e1f5a6d2ecabefc4450800bdc
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 import shutil
5 from collections import deque
6 import time
7
8 from twisted.internet import defer, reactor, task
9 from twisted.python.failure import Failure
10 from twisted.python import runtime
11 from twisted.application import service
12
13 from allmydata.util import fileutil
14 from allmydata.interfaces import IDirectoryNode
15 from allmydata.util import log
16 from allmydata.util.fileutil import precondition_abspath, get_pathinfo
17 from allmydata.util.assertutil import precondition
18 from allmydata.util.deferredutil import HookMixin
19 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
20      extend_filepath, unicode_from_filepath, unicode_segments_from, \
21      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
22 from allmydata.immutable.upload import FileName, Data
23 from allmydata import magicfolderdb, magicpath
24
25
26 IN_EXCL_UNLINK = 0x04000000L
27
28 def get_inotify_module():
29     try:
30         if sys.platform == "win32":
31             from allmydata.windows import inotify
32         elif runtime.platform.supportsINotify():
33             from twisted.internet import inotify
34         else:
35             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
36                                       "This currently requires Linux or Windows.")
37         return inotify
38     except (ImportError, AttributeError) as e:
39         log.msg(e)
40         if sys.platform == "win32":
41             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
42                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
43         raise
44
45
46 class MagicFolder(service.MultiService):
47     name = 'magic-folder'
48
49     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
50                  pending_delay=1.0, clock=reactor):
51         precondition_abspath(local_path_u)
52
53         service.MultiService.__init__(self)
54
55         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
56         if db is None:
57             return Failure(Exception('ERROR: Unable to load magic folder db.'))
58
59         # for tests
60         self._client = client
61         self._db = db
62
63         self.is_ready = False
64
65         self.uploader = Uploader(client, local_path_u, db, upload_dircap, pending_delay, clock)
66         self.downloader = Downloader(client, local_path_u, db, collective_dircap, clock)
67
68     def startService(self):
69         # TODO: why is this being called more than once?
70         if self.running:
71             return defer.succeed(None)
72         print "%r.startService" % (self,)
73         service.MultiService.startService(self)
74         return self.uploader.start_monitoring()
75
76     def ready(self):
77         """ready is used to signal us to start
78         processing the upload and download items...
79         """
80         self.is_ready = True
81         d = self.uploader.start_scanning()
82         d2 = self.downloader.start_scanning()
83         d.addCallback(lambda ign: d2)
84         return d
85
86     def finish(self):
87         print "finish"
88         d = self.uploader.stop()
89         d2 = self.downloader.stop()
90         d.addCallback(lambda ign: d2)
91         return d
92
93     def remove_service(self):
94         return service.MultiService.disownServiceParent(self)
95
96
97 class QueueMixin(HookMixin):
98     def __init__(self, client, local_path_u, db, name, clock):
99         self._client = client
100         self._local_path_u = local_path_u
101         self._local_filepath = to_filepath(local_path_u)
102         self._db = db
103         self._name = name
104         self._clock = clock
105         self._hooks = {'processed': None, 'started': None}
106         self.started_d = self.set_hook('started')
107
108         if not self._local_filepath.exists():
109             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
110                                  "but there is no directory at that location."
111                                  % quote_local_unicode_path(self._local_path_u))
112         if not self._local_filepath.isdir():
113             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
114                                  "but the thing at that location is not a directory."
115                                  % quote_local_unicode_path(self._local_path_u))
116
117         self._deque = deque()
118         self._lazy_tail = defer.succeed(None)
119         self._pending = set()
120         self._stopped = False
121         self._turn_delay = 0
122
123     def _get_filepath(self, relpath_u):
124         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
125
126     def _get_relpath(self, filepath):
127         self._log("_get_relpath(%r)" % (filepath,))
128         segments = unicode_segments_from(filepath, self._local_filepath)
129         self._log("segments = %r" % (segments,))
130         return u"/".join(segments)
131
132     def _count(self, counter_name, delta=1):
133         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
134         self._log("%s += %r" % (counter_name, delta))
135         self._client.stats_provider.count(ctr, delta)
136
137     def _logcb(self, res, msg):
138         self._log("%s: %r" % (msg, res))
139         return res
140
141     def _log(self, msg):
142         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
143         self._client.log(s)
144         print s
145         #open("events", "ab+").write(msg)
146
147     def _append_to_deque(self, relpath_u):
148         self._log("_append_to_deque(%r)" % (relpath_u,))
149         if relpath_u in self._pending or magicpath.should_ignore_file(relpath_u):
150             return
151         self._deque.append(relpath_u)
152         self._pending.add(relpath_u)
153         self._count('objects_queued')
154         if self.is_ready:
155             self._clock.callLater(0, self._turn_deque)
156
157     def _turn_deque(self):
158         self._log("_turn_deque")
159         if self._stopped:
160             self._log("stopped")
161             return
162         try:
163             item = self._deque.pop()
164             self._log("popped %r" % (item,))
165             self._count('objects_queued', -1)
166         except IndexError:
167             self._log("deque is now empty")
168             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
169         else:
170             self._lazy_tail.addCallback(lambda ign: self._process(item))
171             self._lazy_tail.addBoth(self._call_hook, 'processed')
172             self._lazy_tail.addErrback(log.err)
173             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
174
175
176 class Uploader(QueueMixin):
177     def __init__(self, client, local_path_u, db, upload_dircap, pending_delay, clock):
178         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
179
180         self.is_ready = False
181
182         # TODO: allow a path rather than a cap URI.
183         self._upload_dirnode = self._client.create_node_from_uri(upload_dircap)
184         if not IDirectoryNode.providedBy(self._upload_dirnode):
185             raise AssertionError("The URI in 'private/magic_folder_dircap' does not refer to a directory.")
186         if self._upload_dirnode.is_unknown() or self._upload_dirnode.is_readonly():
187             raise AssertionError("The URI in 'private/magic_folder_dircap' is not a writecap to a directory.")
188
189         self._inotify = get_inotify_module()
190         self._notifier = self._inotify.INotify()
191
192         if hasattr(self._notifier, 'set_pending_delay'):
193             self._notifier.set_pending_delay(pending_delay)
194
195         # TODO: what about IN_MOVE_SELF, IN_MOVED_FROM, or IN_UNMOUNT?
196         #
197         self.mask = ( self._inotify.IN_CREATE
198                     | self._inotify.IN_CLOSE_WRITE
199                     | self._inotify.IN_MOVED_TO
200                     | self._inotify.IN_MOVED_FROM
201                     | self._inotify.IN_DELETE
202                     | self._inotify.IN_ONLYDIR
203                     | IN_EXCL_UNLINK
204                     )
205         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
206                              recursive=True)
207
208     def start_monitoring(self):
209         self._log("start_monitoring")
210         d = defer.succeed(None)
211         d.addCallback(lambda ign: self._notifier.startReading())
212         d.addCallback(lambda ign: self._count('dirs_monitored'))
213         d.addBoth(self._call_hook, 'started')
214         return d
215
216     def stop(self):
217         self._log("stop")
218         self._notifier.stopReading()
219         self._count('dirs_monitored', -1)
220         if hasattr(self._notifier, 'wait_until_stopped'):
221             d = self._notifier.wait_until_stopped()
222         else:
223             d = defer.succeed(None)
224         d.addCallback(lambda ign: self._lazy_tail)
225         return d
226
227     def start_scanning(self):
228         self._log("start_scanning")
229         self.is_ready = True
230         self._pending = self._db.get_all_relpaths()
231         self._log("all_files %r" % (self._pending))
232         d = self._scan(u"")
233         def _add_pending(ign):
234             # This adds all of the files that were in the db but not already processed
235             # (normally because they have been deleted on disk).
236             self._log("adding %r" % (self._pending))
237             self._deque.extend(self._pending)
238         d.addCallback(_add_pending)
239         d.addCallback(lambda ign: self._turn_deque())
240         return d
241
242     def _scan(self, reldir_u):
243         self._log("scan %r" % (reldir_u,))
244         fp = self._get_filepath(reldir_u)
245         try:
246             children = listdir_filepath(fp)
247         except EnvironmentError:
248             raise Exception("WARNING: magic folder: permission denied on directory %s"
249                             % quote_filepath(fp))
250         except FilenameEncodingError:
251             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
252                             % quote_filepath(fp))
253
254         d = defer.succeed(None)
255         for child in children:
256             assert isinstance(child, unicode), child
257             d.addCallback(lambda ign, child=child:
258                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
259             def _add_pending(relpath_u):
260                 if magicpath.should_ignore_file(relpath_u):
261                     return None
262
263                 self._pending.add(relpath_u)
264                 return relpath_u
265             d.addCallback(_add_pending)
266             # This call to _process doesn't go through the deque, and probably should.
267             d.addCallback(self._process)
268             d.addBoth(self._call_hook, 'processed')
269             d.addErrback(log.err)
270
271         return d
272
273     def _notify(self, opaque, path, events_mask):
274         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
275
276         # We filter out IN_CREATE events not associated with a directory.
277         # Acting on IN_CREATE for files could cause us to read and upload
278         # a possibly-incomplete file before the application has closed it.
279         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
280         # It isn't possible to avoid watching for IN_CREATE at all, because
281         # it is the only event notified for a directory creation.
282
283         if ((events_mask & self._inotify.IN_CREATE) != 0 and
284             (events_mask & self._inotify.IN_ISDIR) == 0):
285             self._log("ignoring inotify event for creation of file %r\n" % (path,))
286             return
287
288         relpath_u = self._get_relpath(path)
289         self._append_to_deque(relpath_u)
290
291     def _when_queue_is_empty(self):
292         return defer.succeed(None)
293
294     def _process(self, relpath_u):
295         self._log("_process(%r)" % (relpath_u,))
296         if relpath_u is None:
297             return
298         precondition(isinstance(relpath_u, unicode), relpath_u)
299
300         d = defer.succeed(None)
301
302         def _maybe_upload(val, now=None):
303             if now is None:
304                 now = time.time()
305             fp = self._get_filepath(relpath_u)
306             pathinfo = get_pathinfo(unicode_from_filepath(fp))
307
308             self._log("pending = %r, about to remove %r" % (self._pending, relpath_u))
309             self._pending.remove(relpath_u)
310             encoded_path_u = magicpath.path2magic(relpath_u)
311
312             if not pathinfo.exists:
313                 # FIXME merge this with the 'isfile' case.
314                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
315                 self._count('objects_disappeared')
316                 if not self._db.check_file_db_exists(relpath_u):
317                     return None
318
319                 last_downloaded_timestamp = now
320                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
321
322                 current_version = self._db.get_local_file_version(relpath_u)
323                 if current_version is None:
324                     new_version = 0
325                 elif self._db.is_new_file(pathinfo, relpath_u):
326                     new_version = current_version + 1
327                 else:
328                     self._log("Not uploading '{0}'".format(relpath_u))
329                     self._count('objects_not_uploaded')
330                     return
331
332                 metadata = { 'version': new_version,
333                              'deleted': True,
334                              'last_downloaded_timestamp': last_downloaded_timestamp }
335                 if last_downloaded_uri is not None:
336                     metadata['last_downloaded_uri'] = last_downloaded_uri
337
338                 empty_uploadable = Data("", self._client.convergence)
339                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
340                                                    metadata=metadata, overwrite=True)
341
342                 def _add_db_entry(filenode):
343                     filecap = filenode.get_uri()
344                     self._db.did_upload_version(relpath_u, new_version, filecap,
345                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
346                     self._count('files_uploaded')
347                 d2.addCallback(_add_db_entry)
348                 return d2
349             elif pathinfo.islink:
350                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
351                 return None
352             elif pathinfo.isdir:
353                 self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
354                 uploadable = Data("", self._client.convergence)
355                 encoded_path_u += magicpath.path2magic(u"/")
356                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
357                 def _succeeded(ign):
358                     self._log("created subdirectory %r" % (relpath_u,))
359                     self._count('directories_created')
360                 def _failed(f):
361                     self._log("failed to create subdirectory %r" % (relpath_u,))
362                     return f
363                 upload_d.addCallbacks(_succeeded, _failed)
364                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
365                 return upload_d
366             elif pathinfo.isfile:
367                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
368                 last_downloaded_timestamp = now
369
370                 current_version = self._db.get_local_file_version(relpath_u)
371                 if current_version is None:
372                     new_version = 0
373                 elif self._db.is_new_file(pathinfo, relpath_u):
374                     new_version = current_version + 1
375                 else:
376                     self._log("Not uploading '{0}'".format(relpath_u))
377                     self._count('objects_not_uploaded')
378                     return None
379
380                 metadata = { 'version': new_version,
381                              'last_downloaded_timestamp': last_downloaded_timestamp }
382                 if last_downloaded_uri is not None:
383                     metadata['last_downloaded_uri'] = last_downloaded_uri
384
385                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
386                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
387                                                    metadata=metadata, overwrite=True)
388
389                 def _add_db_entry(filenode):
390                     filecap = filenode.get_uri()
391                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
392                     self._db.did_upload_version(relpath_u, new_version, filecap,
393                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
394                     self._count('files_uploaded')
395                 d2.addCallback(_add_db_entry)
396                 return d2
397             else:
398                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
399                 return None
400
401         d.addCallback(_maybe_upload)
402
403         def _succeeded(res):
404             self._count('objects_succeeded')
405             return res
406         def _failed(f):
407             self._count('objects_failed')
408             self._log("%r while processing %r" % (f, relpath_u))
409             return f
410         d.addCallbacks(_succeeded, _failed)
411         return d
412
413     def _get_metadata(self, encoded_path_u):
414         try:
415             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
416         except KeyError:
417             return Failure()
418         return d
419
420     def _get_filenode(self, encoded_path_u):
421         try:
422             d = self._upload_dirnode.get(encoded_path_u)
423         except KeyError:
424             return Failure()
425         return d
426
427
428 class WriteFileMixin(object):
429     FUDGE_SECONDS = 10.0
430
431     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
432         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
433                   % (abspath_u, len(file_contents), is_conflict, now))
434
435         # 1. Write a temporary file, say .foo.tmp.
436         # 2. is_conflict determines whether this is an overwrite or a conflict.
437         # 3. Set the mtime of the replacement file to be T seconds before the
438         #    current local time.
439         # 4. Perform a file replacement with backup filename foo.backup,
440         #    replaced file foo, and replacement file .foo.tmp. If any step of
441         #    this operation fails, reclassify as a conflict and stop.
442         #
443         # Returns the path of the destination file.
444
445         precondition_abspath(abspath_u)
446         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
447         backup_path_u = abspath_u + u".backup"
448         if now is None:
449             now = time.time()
450
451         # ensure parent directory exists
452         head, tail = os.path.split(abspath_u)
453         mode = 0777 # XXX
454         fileutil.make_dirs(head, mode)
455
456         fileutil.write(replacement_path_u, file_contents)
457         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
458         if is_conflict:
459             return self._rename_conflicted_file(abspath_u, replacement_path_u)
460         else:
461             try:
462                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
463                 return abspath_u
464             except fileutil.ConflictError:
465                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
466
467     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
468         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
469
470         conflict_path_u = abspath_u + u".conflict"
471         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
472         return conflict_path_u
473
474
475 class Downloader(QueueMixin, WriteFileMixin):
476     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
477
478     def __init__(self, client, local_path_u, db, collective_dircap, clock):
479         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
480
481         # TODO: allow a path rather than a cap URI.
482         self._collective_dirnode = self._client.create_node_from_uri(collective_dircap)
483
484         if not IDirectoryNode.providedBy(self._collective_dirnode):
485             raise AssertionError("The URI in 'private/collective_dircap' does not refer to a directory.")
486         if self._collective_dirnode.is_unknown() or not self._collective_dirnode.is_readonly():
487             raise AssertionError("The URI in 'private/collective_dircap' is not a readonly cap to a directory.")
488
489         self._turn_delay = self.REMOTE_SCAN_INTERVAL
490         self._download_scan_batch = {} # path -> [(filenode, metadata)]
491
492     def start_scanning(self):
493         self._log("start_scanning")
494         files = self._db.get_all_relpaths()
495         self._log("all files %s" % files)
496
497         d = self._scan_remote_collective()
498         self._turn_deque()
499         return d
500
501     def stop(self):
502         self._stopped = True
503         d = defer.succeed(None)
504         d.addCallback(lambda ign: self._lazy_tail)
505         return d
506
507     def _should_download(self, relpath_u, remote_version):
508         """
509         _should_download returns a bool indicating whether or not a remote object should be downloaded.
510         We check the remote metadata version against our magic-folder db version number;
511         latest version wins.
512         """
513         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
514         if magicpath.should_ignore_file(relpath_u):
515             self._log("nope")
516             return False
517         self._log("yep")
518         v = self._db.get_local_file_version(relpath_u)
519         self._log("v = %r" % (v,))
520         return (v is None or v < remote_version)
521
522     def _get_local_latest(self, relpath_u):
523         """
524         _get_local_latest takes a unicode path string checks to see if this file object
525         exists in our magic-folder db; if not then return None
526         else check for an entry in our magic-folder db and return the version number.
527         """
528         if not self._get_filepath(relpath_u).exists():
529             return None
530         return self._db.get_local_file_version(relpath_u)
531
532     def _get_collective_latest_file(self, filename):
533         """
534         _get_collective_latest_file takes a file path pointing to a file managed by
535         magic-folder and returns a deferred that fires with the two tuple containing a
536         file node and metadata for the latest version of the file located in the
537         magic-folder collective directory.
538         """
539         collective_dirmap_d = self._collective_dirnode.list()
540         def scan_collective(result):
541             list_of_deferreds = []
542             for dir_name in result.keys():
543                 # XXX make sure it's a directory
544                 d = defer.succeed(None)
545                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
546                 list_of_deferreds.append(d)
547             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
548             return deferList
549         collective_dirmap_d.addCallback(scan_collective)
550         def highest_version(deferredList):
551             max_version = 0
552             metadata = None
553             node = None
554             for success, result in deferredList:
555                 if success:
556                     if result[1]['version'] > max_version:
557                         node, metadata = result
558                         max_version = result[1]['version']
559             return node, metadata
560         collective_dirmap_d.addCallback(highest_version)
561         return collective_dirmap_d
562
563     def _append_to_batch(self, name, file_node, metadata):
564         if self._download_scan_batch.has_key(name):
565             self._download_scan_batch[name] += [(file_node, metadata)]
566         else:
567             self._download_scan_batch[name] = [(file_node, metadata)]
568
569     def _scan_remote(self, nickname, dirnode):
570         self._log("_scan_remote nickname %r" % (nickname,))
571         d = dirnode.list()
572         def scan_listing(listing_map):
573             for encoded_relpath_u in listing_map.keys():
574                 relpath_u = magicpath.magic2path(encoded_relpath_u)
575                 self._log("found %r" % (relpath_u,))
576
577                 file_node, metadata = listing_map[encoded_relpath_u]
578                 local_version = self._get_local_latest(relpath_u)
579                 remote_version = metadata.get('version', None)
580                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
581                 if local_version is None or remote_version is None or local_version < remote_version:
582                     self._log("%r added to download queue" % (relpath_u,))
583                     self._append_to_batch(relpath_u, file_node, metadata)
584         d.addCallback(scan_listing)
585         d.addBoth(self._logcb, "end of _scan_remote")
586         return d
587
588     def _scan_remote_collective(self):
589         self._log("_scan_remote_collective")
590         self._download_scan_batch = {} # XXX
591
592         if self._collective_dirnode is None:
593             return
594         collective_dirmap_d = self._collective_dirnode.list()
595         def do_list(result):
596             others = [x for x in result.keys()]
597             return result, others
598         collective_dirmap_d.addCallback(do_list)
599         def scan_collective(result):
600             d = defer.succeed(None)
601             collective_dirmap, others_list = result
602             for dir_name in others_list:
603                 d.addCallback(lambda x, dir_name=dir_name: self._scan_remote(dir_name, collective_dirmap[dir_name][0]))
604                 # XXX todo add errback
605             return d
606         collective_dirmap_d.addCallback(scan_collective)
607         collective_dirmap_d.addCallback(self._filter_scan_batch)
608         collective_dirmap_d.addCallback(self._add_batch_to_download_queue)
609         return collective_dirmap_d
610
611     def _add_batch_to_download_queue(self, result):
612         self._log("result = %r" % (result,))
613         self._log("deque = %r" % (self._deque,))
614         self._deque.extend(result)
615         self._log("deque after = %r" % (self._deque,))
616         self._count('objects_queued', len(result))
617         self._log("pending = %r" % (self._pending,))
618         self._pending.update(map(lambda x: x[0], result))
619         self._log("pending after = %r" % (self._pending,))
620
621     def _filter_scan_batch(self, result):
622         self._log("_filter_scan_batch")
623         extension = [] # consider whether this should be a dict
624         for relpath_u in self._download_scan_batch.keys():
625             if relpath_u in self._pending:
626                 continue
627             file_node, metadata = max(self._download_scan_batch[relpath_u], key=lambda x: x[1]['version'])
628             if self._should_download(relpath_u, metadata['version']):
629                 extension += [(relpath_u, file_node, metadata)]
630             else:
631                 self._log("Excluding '{0}'".format(relpath_u))
632                 self._count('objects_excluded')
633                 self._call_hook(None, 'processed')
634         return extension
635
636     def _when_queue_is_empty(self):
637         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
638         d.addBoth(self._logcb, "after _scan_remote_collective")
639         d.addCallback(lambda ign: self._turn_deque())
640         return d
641
642     def _process(self, item, now=None):
643         self._log("_process(%r)" % (item,))
644         if now is None:
645             now = time.time()
646         (relpath_u, file_node, metadata) = item
647         fp = self._get_filepath(relpath_u)
648         abspath_u = unicode_from_filepath(fp)
649
650         d = defer.succeed(None)
651         if relpath_u.endswith(u"/"):
652             self._log("mkdir(%r)" % (abspath_u,))
653             d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
654             d.addCallback(lambda ign: abspath_u)
655         else:
656             d.addCallback(lambda ign: file_node.download_best_version())
657             if metadata.get('deleted', False):
658                 d.addCallback(lambda result: self._unlink_deleted_file(abspath_u, result))
659             else:
660                 d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents, is_conflict=False))
661
662         def do_update_db(written_abspath_u):
663             filecap = file_node.get_uri()
664             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
665             last_downloaded_uri = filecap
666             last_downloaded_timestamp = now
667             written_pathinfo = get_pathinfo(written_abspath_u)
668             if not written_pathinfo.exists and not metadata.get('deleted', False):
669                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
670
671             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
672                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
673             self._count('objects_downloaded')
674         def failed(f):
675             self._log("download failed: %s" % (str(f),))
676             self._count('objects_failed')
677             return f
678         d.addCallbacks(do_update_db, failed)
679         def remove_from_pending(res):
680             self._pending.remove(relpath_u)
681             return res
682         d.addBoth(remove_from_pending)
683         return d
684
685     def _unlink_deleted_file(self, abspath_u, result):
686         try:
687             self._log('unlinking: %s' % (abspath_u,))
688             shutil.move(abspath_u, abspath_u + '.backup')
689         except IOError:
690             self._log("Already gone: '%s'" % (abspath_u,))
691         return abspath_u