]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Add debugging print statement for timestamp comparison
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 class MagicFolder(service.MultiService):
46     name = 'magic-folder'
47
48     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile,
49                  pending_delay=1.0, clock=None):
50         precondition_abspath(local_path_u)
51
52         service.MultiService.__init__(self)
53
54         immediate = clock is not None
55         clock = clock or reactor
56         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
57         if db is None:
58             return Failure(Exception('ERROR: Unable to load magic folder db.'))
59
60         # for tests
61         self._client = client
62         self._db = db
63
64         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
65         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
66
67         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
68         self.downloader = Downloader(client, local_path_u, db, collective_dirnode, upload_dirnode.get_readonly_uri(), clock)
69
70     def startService(self):
71         # TODO: why is this being called more than once?
72         if self.running:
73             return defer.succeed(None)
74         print "%r.startService" % (self,)
75         service.MultiService.startService(self)
76         return self.uploader.start_monitoring()
77
78     def ready(self):
79         """ready is used to signal us to start
80         processing the upload and download items...
81         """
82         d = self.uploader.start_scanning()
83         d2 = self.downloader.start_scanning()
84         d.addCallback(lambda ign: d2)
85         return d
86
87     def finish(self):
88         print "finish"
89         d = self.uploader.stop()
90         d2 = self.downloader.stop()
91         d.addCallback(lambda ign: d2)
92         return d
93
94     def remove_service(self):
95         return service.MultiService.disownServiceParent(self)
96
97
98 class QueueMixin(HookMixin):
99     def __init__(self, client, local_path_u, db, name, clock):
100         self._client = client
101         self._local_path_u = local_path_u
102         self._local_filepath = to_filepath(local_path_u)
103         self._db = db
104         self._name = name
105         self._clock = clock
106         self._hooks = {'processed': None, 'started': None}
107         self.started_d = self.set_hook('started')
108
109         if not self._local_filepath.exists():
110             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
111                                  "but there is no directory at that location."
112                                  % quote_local_unicode_path(self._local_path_u))
113         if not self._local_filepath.isdir():
114             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
115                                  "but the thing at that location is not a directory."
116                                  % quote_local_unicode_path(self._local_path_u))
117
118         self._deque = deque()
119         self._lazy_tail = defer.succeed(None)
120         self._stopped = False
121         self._turn_delay = 0
122
123     def _get_filepath(self, relpath_u):
124         self._log("_get_filepath(%r)" % (relpath_u,))
125         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
126
127     def _get_relpath(self, filepath):
128         self._log("_get_relpath(%r)" % (filepath,))
129         segments = unicode_segments_from(filepath, self._local_filepath)
130         self._log("segments = %r" % (segments,))
131         return u"/".join(segments)
132
133     def _count(self, counter_name, delta=1):
134         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
135         self._log("%s += %r" % (counter_name, delta))
136         self._client.stats_provider.count(ctr, delta)
137
138     def _logcb(self, res, msg):
139         self._log("%s: %r" % (msg, res))
140         return res
141
142     def _log(self, msg):
143         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
144         self._client.log(s)
145         print s
146         #open("events", "ab+").write(msg)
147
148     def _turn_deque(self):
149         self._log("_turn_deque")
150         if self._stopped:
151             self._log("stopped")
152             return
153         try:
154             item = self._deque.pop()
155             self._log("popped %r" % (item,))
156             self._count('objects_queued', -1)
157         except IndexError:
158             self._log("deque is now empty")
159             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
160         else:
161             self._lazy_tail.addCallback(lambda ign: self._process(item))
162             self._lazy_tail.addBoth(self._call_hook, 'processed')
163             self._lazy_tail.addErrback(log.err)
164             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
165
166
167 class Uploader(QueueMixin):
168     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
169                  immediate=False):
170         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
171
172         self.is_ready = False
173         self._immediate = immediate
174
175         if not IDirectoryNode.providedBy(upload_dirnode):
176             raise AssertionError("The URI in '%s' does not refer to a directory."
177                                  % os.path.join('private', 'magic_folder_dircap'))
178         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
179             raise AssertionError("The URI in '%s' is not a writecap to a directory."
180                                  % os.path.join('private', 'magic_folder_dircap'))
181
182         self._upload_dirnode = upload_dirnode
183         self._inotify = get_inotify_module()
184         self._notifier = self._inotify.INotify()
185         self._pending = set()
186
187         if hasattr(self._notifier, 'set_pending_delay'):
188             self._notifier.set_pending_delay(pending_delay)
189
190         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
191         #
192         self.mask = ( self._inotify.IN_CREATE
193                     | self._inotify.IN_CLOSE_WRITE
194                     | self._inotify.IN_MOVED_TO
195                     | self._inotify.IN_MOVED_FROM
196                     | self._inotify.IN_DELETE
197                     | self._inotify.IN_ONLYDIR
198                     | IN_EXCL_UNLINK
199                     )
200         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
201                              recursive=True)
202
203     def start_monitoring(self):
204         self._log("start_monitoring")
205         d = defer.succeed(None)
206         d.addCallback(lambda ign: self._notifier.startReading())
207         d.addCallback(lambda ign: self._count('dirs_monitored'))
208         d.addBoth(self._call_hook, 'started')
209         return d
210
211     def stop(self):
212         self._log("stop")
213         self._notifier.stopReading()
214         self._count('dirs_monitored', -1)
215         if hasattr(self._notifier, 'wait_until_stopped'):
216             d = self._notifier.wait_until_stopped()
217         else:
218             d = defer.succeed(None)
219         d.addCallback(lambda ign: self._lazy_tail)
220         return d
221
222     def start_scanning(self):
223         self._log("start_scanning")
224         self.is_ready = True
225         self._pending = self._db.get_all_relpaths()
226         self._log("all_files %r" % (self._pending))
227         d = self._scan(u"")
228         def _add_pending(ign):
229             # This adds all of the files that were in the db but not already processed
230             # (normally because they have been deleted on disk).
231             self._log("adding %r" % (self._pending))
232             self._deque.extend(self._pending)
233         d.addCallback(_add_pending)
234         d.addCallback(lambda ign: self._turn_deque())
235         return d
236
237     def _scan(self, reldir_u):
238         self._log("scan %r" % (reldir_u,))
239         fp = self._get_filepath(reldir_u)
240         try:
241             children = listdir_filepath(fp)
242         except EnvironmentError:
243             raise Exception("WARNING: magic folder: permission denied on directory %s"
244                             % quote_filepath(fp))
245         except FilenameEncodingError:
246             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
247                             % quote_filepath(fp))
248
249         d = defer.succeed(None)
250         for child in children:
251             _assert(isinstance(child, unicode), child=child)
252             d.addCallback(lambda ign, child=child:
253                           ("%s/%s" % (reldir_u, child) if reldir_u else child))
254             def _add_pending(relpath_u):
255                 if magicpath.should_ignore_file(relpath_u):
256                     return None
257
258                 self._pending.add(relpath_u)
259                 return relpath_u
260             d.addCallback(_add_pending)
261             # This call to _process doesn't go through the deque, and probably should.
262             d.addCallback(self._process)
263             d.addBoth(self._call_hook, 'processed')
264             d.addErrback(log.err)
265
266         return d
267
268     def _notify(self, opaque, path, events_mask):
269         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
270         relpath_u = self._get_relpath(path)
271
272         # We filter out IN_CREATE events not associated with a directory.
273         # Acting on IN_CREATE for files could cause us to read and upload
274         # a possibly-incomplete file before the application has closed it.
275         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
276         # It isn't possible to avoid watching for IN_CREATE at all, because
277         # it is the only event notified for a directory creation.
278
279         if ((events_mask & self._inotify.IN_CREATE) != 0 and
280             (events_mask & self._inotify.IN_ISDIR) == 0):
281             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
282             return
283         if relpath_u in self._pending:
284             self._log("ignoring event for %r (already pending)" % (relpath_u,))
285             return
286         if magicpath.should_ignore_file(relpath_u):
287             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
288             return
289
290         self._log("appending %r to deque" % (relpath_u,))
291         self._deque.append(relpath_u)
292         self._pending.add(relpath_u)
293         self._count('objects_queued')
294         if self.is_ready:
295             if self._immediate:  # for tests
296                 self._turn_deque()
297             else:
298                 self._clock.callLater(0, self._turn_deque)
299
300     def _when_queue_is_empty(self):
301         return defer.succeed(None)
302
303     def _process(self, relpath_u):
304         # Uploader
305         self._log("_process(%r)" % (relpath_u,))
306         if relpath_u is None:
307             return
308         precondition(isinstance(relpath_u, unicode), relpath_u)
309         precondition(not relpath_u.endswith(u'/'), relpath_u)
310
311         d = defer.succeed(None)
312
313         def _maybe_upload(val, now=None):
314             if now is None:
315                 now = time.time()
316             fp = self._get_filepath(relpath_u)
317             pathinfo = get_pathinfo(unicode_from_filepath(fp))
318
319             self._log("about to remove %r from pending set %r" %
320                       (relpath_u, self._pending))
321             self._pending.remove(relpath_u)
322             encoded_path_u = magicpath.path2magic(relpath_u)
323
324             if not pathinfo.exists:
325                 # FIXME merge this with the 'isfile' case.
326                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
327                 self._count('objects_disappeared')
328                 if not self._db.check_file_db_exists(relpath_u):
329                     return None
330
331                 last_downloaded_timestamp = now
332                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
333
334                 current_version = self._db.get_local_file_version(relpath_u)
335                 if current_version is None:
336                     new_version = 0
337                 elif self._db.is_new_file(pathinfo, relpath_u):
338                     new_version = current_version + 1
339                 else:
340                     self._log("Not uploading %r" % (relpath_u,))
341                     self._count('objects_not_uploaded')
342                     return
343
344                 metadata = { 'version': new_version,
345                              'deleted': True,
346                              'last_downloaded_timestamp': last_downloaded_timestamp }
347                 if last_downloaded_uri is not None:
348                     metadata['last_downloaded_uri'] = last_downloaded_uri
349
350                 empty_uploadable = Data("", self._client.convergence)
351                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
352                                                    metadata=metadata, overwrite=True)
353
354                 def _add_db_entry(filenode):
355                     filecap = filenode.get_uri()
356                     self._db.did_upload_version(relpath_u, new_version, filecap,
357                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
358                     self._count('files_uploaded')
359                 d2.addCallback(_add_db_entry)
360                 return d2
361             elif pathinfo.islink:
362                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
363                 return None
364             elif pathinfo.isdir:
365                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
366                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
367
368                 uploadable = Data("", self._client.convergence)
369                 encoded_path_u += magicpath.path2magic(u"/")
370                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
371                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
372                 def _succeeded(ign):
373                     self._log("created subdirectory %r" % (relpath_u,))
374                     self._count('directories_created')
375                 def _failed(f):
376                     self._log("failed to create subdirectory %r" % (relpath_u,))
377                     return f
378                 upload_d.addCallbacks(_succeeded, _failed)
379                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
380                 return upload_d
381             elif pathinfo.isfile:
382                 last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
383                 last_downloaded_timestamp = now
384
385                 current_version = self._db.get_local_file_version(relpath_u)
386                 if current_version is None:
387                     new_version = 0
388                 elif self._db.is_new_file(pathinfo, relpath_u):
389                     new_version = current_version + 1
390                 else:
391                     self._log("Not uploading %r" % (relpath_u,))
392                     self._count('objects_not_uploaded')
393                     return None
394
395                 metadata = { 'version': new_version,
396                              'last_downloaded_timestamp': last_downloaded_timestamp }
397                 if last_downloaded_uri is not None:
398                     metadata['last_downloaded_uri'] = last_downloaded_uri
399
400                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
401                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
402                                                    metadata=metadata, overwrite=True)
403
404                 def _add_db_entry(filenode):
405                     filecap = filenode.get_uri()
406                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
407                     self._db.did_upload_version(relpath_u, new_version, filecap,
408                                                 last_downloaded_uri, last_downloaded_timestamp, pathinfo)
409                     self._count('files_uploaded')
410                 d2.addCallback(_add_db_entry)
411                 return d2
412             else:
413                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
414                 return None
415
416         d.addCallback(_maybe_upload)
417
418         def _succeeded(res):
419             self._count('objects_succeeded')
420             return res
421         def _failed(f):
422             self._count('objects_failed')
423             self._log("%s while processing %r" % (f, relpath_u))
424             return f
425         d.addCallbacks(_succeeded, _failed)
426         return d
427
428     def _get_metadata(self, encoded_path_u):
429         try:
430             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
431         except KeyError:
432             return Failure()
433         return d
434
435     def _get_filenode(self, encoded_path_u):
436         try:
437             d = self._upload_dirnode.get(encoded_path_u)
438         except KeyError:
439             return Failure()
440         return d
441
442
443 class WriteFileMixin(object):
444     FUDGE_SECONDS = 10.0
445
446     def _get_conflicted_filename(self, abspath_u):
447         return abspath_u + u".conflict"
448
449     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
450         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
451                   % (abspath_u, len(file_contents), is_conflict, now))
452
453         # 1. Write a temporary file, say .foo.tmp.
454         # 2. is_conflict determines whether this is an overwrite or a conflict.
455         # 3. Set the mtime of the replacement file to be T seconds before the
456         #    current local time.
457         # 4. Perform a file replacement with backup filename foo.backup,
458         #    replaced file foo, and replacement file .foo.tmp. If any step of
459         #    this operation fails, reclassify as a conflict and stop.
460         #
461         # Returns the path of the destination file.
462
463         precondition_abspath(abspath_u)
464         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
465         backup_path_u = abspath_u + u".backup"
466         if now is None:
467             now = time.time()
468
469         # ensure parent directory exists
470         head, tail = os.path.split(abspath_u)
471         mode = 0777 # XXX
472         fileutil.make_dirs(head, mode)
473
474         fileutil.write(replacement_path_u, file_contents)
475         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
476         if is_conflict:
477             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
478             return self._rename_conflicted_file(abspath_u, replacement_path_u)
479         else:
480             try:
481                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
482                 return abspath_u
483             except fileutil.ConflictError:
484                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
485
486     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
487         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
488
489         conflict_path_u = self._get_conflicted_filename(abspath_u)
490         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
491         if os.path.isfile(replacement_path_u):
492             print "%r exists" % (replacement_path_u,)
493         if os.path.isfile(conflict_path_u):
494             print "%r exists" % (conflict_path_u,)
495
496         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
497         return conflict_path_u
498
499     def _rename_deleted_file(self, abspath_u):
500         self._log('renaming deleted file to backup: %s' % (abspath_u,))
501         try:
502             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
503         except IOError:
504             # XXX is this the correct error?
505             self._log("Already gone: '%s'" % (abspath_u,))
506         return abspath_u
507
508
509 class Downloader(QueueMixin, WriteFileMixin):
510     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
511
512     def __init__(self, client, local_path_u, db, collective_dirnode, upload_readonly_dircap, clock):
513         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
514
515         if not IDirectoryNode.providedBy(collective_dirnode):
516             raise AssertionError("The URI in '%s' does not refer to a directory."
517                                  % os.path.join('private', 'collective_dircap'))
518         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
519             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
520                                  % os.path.join('private', 'collective_dircap'))
521
522         self._collective_dirnode = collective_dirnode
523         self._upload_readonly_dircap = upload_readonly_dircap
524
525         self._turn_delay = self.REMOTE_SCAN_INTERVAL
526
527     def start_scanning(self):
528         self._log("start_scanning")
529         files = self._db.get_all_relpaths()
530         self._log("all files %s" % files)
531
532         d = self._scan_remote_collective()
533         d.addBoth(self._logcb, "after _scan_remote_collective 0")
534         self._turn_deque()
535         return d
536
537     def stop(self):
538         self._stopped = True
539         d = defer.succeed(None)
540         d.addCallback(lambda ign: self._lazy_tail)
541         return d
542
543     def _should_download(self, relpath_u, remote_version):
544         """
545         _should_download returns a bool indicating whether or not a remote object should be downloaded.
546         We check the remote metadata version against our magic-folder db version number;
547         latest version wins.
548         """
549         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
550         if magicpath.should_ignore_file(relpath_u):
551             self._log("nope")
552             return False
553         self._log("yep")
554         v = self._db.get_local_file_version(relpath_u)
555         self._log("v = %r" % (v,))
556         return (v is None or v < remote_version)
557
558     def _get_local_latest(self, relpath_u):
559         """
560         _get_local_latest takes a unicode path string checks to see if this file object
561         exists in our magic-folder db; if not then return None
562         else check for an entry in our magic-folder db and return the version number.
563         """
564         if not self._get_filepath(relpath_u).exists():
565             return None
566         return self._db.get_local_file_version(relpath_u)
567
568     def _get_collective_latest_file(self, filename):
569         """
570         _get_collective_latest_file takes a file path pointing to a file managed by
571         magic-folder and returns a deferred that fires with the two tuple containing a
572         file node and metadata for the latest version of the file located in the
573         magic-folder collective directory.
574         """
575         collective_dirmap_d = self._collective_dirnode.list()
576         def scan_collective(result):
577             list_of_deferreds = []
578             for dir_name in result.keys():
579                 # XXX make sure it's a directory
580                 d = defer.succeed(None)
581                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
582                 list_of_deferreds.append(d)
583             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
584             return deferList
585         collective_dirmap_d.addCallback(scan_collective)
586         def highest_version(deferredList):
587             max_version = 0
588             metadata = None
589             node = None
590             for success, result in deferredList:
591                 if success:
592                     if result[1]['version'] > max_version:
593                         node, metadata = result
594                         max_version = result[1]['version']
595             return node, metadata
596         collective_dirmap_d.addCallback(highest_version)
597         return collective_dirmap_d
598
599     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
600         self._log("_scan_remote_dmd nickname %r" % (nickname,))
601         d = dirnode.list()
602         def scan_listing(listing_map):
603             for encoded_relpath_u in listing_map.keys():
604                 relpath_u = magicpath.magic2path(encoded_relpath_u)
605                 self._log("found %r" % (relpath_u,))
606
607                 file_node, metadata = listing_map[encoded_relpath_u]
608                 local_version = self._get_local_latest(relpath_u)
609                 remote_version = metadata.get('version', None)
610                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
611
612                 if local_version is None or remote_version is None or local_version < remote_version:
613                     self._log("%r added to download queue" % (relpath_u,))
614                     if scan_batch.has_key(relpath_u):
615                         scan_batch[relpath_u] += [(file_node, metadata)]
616                     else:
617                         scan_batch[relpath_u] = [(file_node, metadata)]
618
619         d.addCallback(scan_listing)
620         d.addBoth(self._logcb, "end of _scan_remote_dmd")
621         return d
622
623     def _scan_remote_collective(self):
624         self._log("_scan_remote_collective")
625         scan_batch = {}  # path -> [(filenode, metadata)]
626
627         d = self._collective_dirnode.list()
628         def scan_collective(dirmap):
629             d2 = defer.succeed(None)
630             for dir_name in dirmap:
631                 (dirnode, metadata) = dirmap[dir_name]
632                 if dirnode.get_readonly_uri() != self._upload_readonly_dircap:
633                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
634                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
635                     def _err(f, dir_name=dir_name):
636                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
637                         # XXX what should we do to make this failure more visible to users?
638                     d2.addErrback(_err)
639
640             return d2
641         d.addCallback(scan_collective)
642
643         def _filter_batch_to_deque(ign):
644             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
645             for relpath_u in scan_batch.keys():
646                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
647
648                 if self._should_download(relpath_u, metadata['version']):
649                     self._deque.append( (relpath_u, file_node, metadata) )
650                 else:
651                     self._log("Excluding %r" % (relpath_u,))
652                     self._count('objects_excluded')
653                     self._call_hook(None, 'processed')
654
655             self._log("deque after = %r" % (self._deque,))
656         d.addCallback(_filter_batch_to_deque)
657         return d
658
659     def _when_queue_is_empty(self):
660         d = task.deferLater(self._clock, self._turn_delay, self._scan_remote_collective)
661         d.addBoth(self._logcb, "after _scan_remote_collective 1")
662         d.addCallback(lambda ign: self._turn_deque())
663         return d
664
665     def _process(self, item, now=None):
666         # Downloader
667         self._log("_process(%r)" % (item,))
668         if now is None:
669             now = time.time()
670         (relpath_u, file_node, metadata) = item
671         fp = self._get_filepath(relpath_u)
672         abspath_u = unicode_from_filepath(fp)
673         conflict_path_u = self._get_conflicted_filename(abspath_u)
674
675         d = defer.succeed(None)
676
677         def do_update_db(written_abspath_u):
678             filecap = file_node.get_uri()
679             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
680             last_downloaded_uri = filecap
681             last_downloaded_timestamp = now
682             written_pathinfo = get_pathinfo(written_abspath_u)
683
684             if not written_pathinfo.exists and not metadata.get('deleted', False):
685                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
686
687             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
688                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
689             self._count('objects_downloaded')
690         def failed(f):
691             self._log("download failed: %s" % (str(f),))
692             self._count('objects_failed')
693             return f
694
695         if os.path.isfile(conflict_path_u):
696             def fail(res):
697                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
698             d.addCallback(fail)
699         else:
700             is_conflict = False
701             if self._db.check_file_db_exists(relpath_u):
702                 dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
703                 local_last_downloaded_uri = self._db.get_last_downloaded_uri(relpath_u)
704                 print "metadata %r" % (metadata,)
705                 print "<<<<--- if %r != %r" % (dmd_last_downloaded_uri, local_last_downloaded_uri)
706                 if dmd_last_downloaded_uri is not None and local_last_downloaded_uri is not None:
707                     if dmd_last_downloaded_uri != local_last_downloaded_uri:
708                         is_conflict = True
709                         self._count('objects_conflicted')
710                     else:
711                         dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
712                         local_last_uploaded_uri = self._db.get_last_uploaded_uri(relpath_u)
713                         print ">>>>  if %r != %r" % (dmd_last_uploaded_uri, local_last_uploaded_uri)
714                         if dmd_last_uploaded_uri != local_last_uploaded_uri:
715                             is_conflict = True
716                             self._count('objects_conflicted')
717             if relpath_u.endswith(u"/"):
718                 if metadata.get('deleted', False):
719                     self._log("rmdir(%r) ignored" % (abspath_u,))
720                 else:
721                     self._log("mkdir(%r)" % (abspath_u,))
722                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
723                     d.addCallback(lambda ign: abspath_u)
724             else:
725                 if metadata.get('deleted', False):
726                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
727                 else:
728                     d.addCallback(lambda ign: file_node.download_best_version())
729                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
730                                                                                is_conflict=is_conflict))
731
732         d.addCallbacks(do_update_db, failed)
733
734         def trap_conflicts(f):
735             f.trap(ConflictError)
736             return None
737         d.addErrback(trap_conflicts)
738         return d