]> git.rkrishnan.org Git - tahoe-lafs/tahoe-lafs.git/blob - src/allmydata/frontends/magic_folder.py
Naive periodic full scan
[tahoe-lafs/tahoe-lafs.git] / src / allmydata / frontends / magic_folder.py
1
2 import sys, os
3 import os.path
4 from collections import deque
5 import time
6
7 from twisted.internet import defer, reactor, task
8 from twisted.python.failure import Failure
9 from twisted.python import runtime
10 from twisted.application import service
11
12 from allmydata.util import fileutil
13 from allmydata.interfaces import IDirectoryNode
14 from allmydata.util import log
15 from allmydata.util.fileutil import precondition_abspath, get_pathinfo, ConflictError
16 from allmydata.util.assertutil import precondition, _assert
17 from allmydata.util.deferredutil import HookMixin
18 from allmydata.util.encodingutil import listdir_filepath, to_filepath, \
19      extend_filepath, unicode_from_filepath, unicode_segments_from, \
20      quote_filepath, quote_local_unicode_path, quote_output, FilenameEncodingError
21 from allmydata.immutable.upload import FileName, Data
22 from allmydata import magicfolderdb, magicpath
23
24
25 IN_EXCL_UNLINK = 0x04000000L
26
27 def get_inotify_module():
28     try:
29         if sys.platform == "win32":
30             from allmydata.windows import inotify
31         elif runtime.platform.supportsINotify():
32             from twisted.internet import inotify
33         else:
34             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
35                                       "This currently requires Linux or Windows.")
36         return inotify
37     except (ImportError, AttributeError) as e:
38         log.msg(e)
39         if sys.platform == "win32":
40             raise NotImplementedError("filesystem notification needed for Magic Folder is not supported.\n"
41                                       "Windows support requires at least Vista, and has only been tested on Windows 7.")
42         raise
43
44
45 def is_new_file(pathinfo, db_entry):
46     if db_entry is None:
47         return True
48
49     if not pathinfo.exists and db_entry.size is None:
50         return False
51
52     return ((pathinfo.size, pathinfo.ctime, pathinfo.mtime) !=
53             (db_entry.size, db_entry.ctime, db_entry.mtime))
54
55
56 class MagicFolder(service.MultiService):
57     name = 'magic-folder'
58
59     def __init__(self, client, upload_dircap, collective_dircap, local_path_u, dbfile, umask,
60                  pending_delay=1.0, clock=None):
61         precondition_abspath(local_path_u)
62
63         service.MultiService.__init__(self)
64
65         immediate = clock is not None
66         clock = clock or reactor
67         db = magicfolderdb.get_magicfolderdb(dbfile, create_version=(magicfolderdb.SCHEMA_v1, 1))
68         if db is None:
69             return Failure(Exception('ERROR: Unable to load magic folder db.'))
70
71         # for tests
72         self._client = client
73         self._db = db
74
75         upload_dirnode = self._client.create_node_from_uri(upload_dircap)
76         collective_dirnode = self._client.create_node_from_uri(collective_dircap)
77
78         self.uploader = Uploader(client, local_path_u, db, upload_dirnode, pending_delay, clock, immediate)
79         self.downloader = Downloader(client, local_path_u, db, collective_dirnode,
80                                      upload_dirnode.get_readonly_uri(), clock, self.uploader.is_pending, umask)
81
82     def startService(self):
83         # TODO: why is this being called more than once?
84         if self.running:
85             return defer.succeed(None)
86         print "%r.startService" % (self,)
87         service.MultiService.startService(self)
88         return self.uploader.start_monitoring()
89
90     def ready(self):
91         """ready is used to signal us to start
92         processing the upload and download items...
93         """
94         self.uploader.start_uploading()  # synchronous
95         return self.downloader.start_downloading()
96
97     def finish(self):
98         print "finish"
99         d = self.uploader.stop()
100         d2 = self.downloader.stop()
101         d.addCallback(lambda ign: d2)
102         return d
103
104     def remove_service(self):
105         return service.MultiService.disownServiceParent(self)
106
107
108 class QueueMixin(HookMixin):
109     def __init__(self, client, local_path_u, db, name, clock):
110         self._client = client
111         self._local_path_u = local_path_u
112         self._local_filepath = to_filepath(local_path_u)
113         self._db = db
114         self._name = name
115         self._clock = clock
116         self._hooks = {'processed': None, 'started': None}
117         self.started_d = self.set_hook('started')
118
119         if not self._local_filepath.exists():
120             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
121                                  "but there is no directory at that location."
122                                  % quote_local_unicode_path(self._local_path_u))
123         if not self._local_filepath.isdir():
124             raise AssertionError("The '[magic_folder] local.directory' parameter was %s "
125                                  "but the thing at that location is not a directory."
126                                  % quote_local_unicode_path(self._local_path_u))
127
128         self._deque = deque()
129         self._lazy_tail = defer.succeed(None)
130         self._stopped = False
131         self._turn_delay = 0
132
133     def _get_filepath(self, relpath_u):
134         self._log("_get_filepath(%r)" % (relpath_u,))
135         return extend_filepath(self._local_filepath, relpath_u.split(u"/"))
136
137     def _get_relpath(self, filepath):
138         self._log("_get_relpath(%r)" % (filepath,))
139         segments = unicode_segments_from(filepath, self._local_filepath)
140         self._log("segments = %r" % (segments,))
141         return u"/".join(segments)
142
143     def _count(self, counter_name, delta=1):
144         ctr = 'magic_folder.%s.%s' % (self._name, counter_name)
145         self._log("%s += %r" % (counter_name, delta))
146         self._client.stats_provider.count(ctr, delta)
147
148     def _logcb(self, res, msg):
149         self._log("%s: %r" % (msg, res))
150         return res
151
152     def _log(self, msg):
153         s = "Magic Folder %s %s: %s" % (quote_output(self._client.nickname), self._name, msg)
154         self._client.log(s)
155         print s
156         #open("events", "ab+").write(msg)
157
158     def _turn_deque(self):
159         self._log("_turn_deque")
160         if self._stopped:
161             self._log("stopped")
162             return
163         try:
164             item = self._deque.pop()
165             self._log("popped %r" % (item,))
166             self._count('objects_queued', -1)
167         except IndexError:
168             self._log("deque is now empty")
169             self._lazy_tail.addCallback(lambda ign: self._when_queue_is_empty())
170         else:
171             self._lazy_tail.addCallback(lambda ign: self._process(item))
172             self._lazy_tail.addBoth(self._call_hook, 'processed')
173             self._lazy_tail.addErrback(log.err)
174             self._lazy_tail.addCallback(lambda ign: task.deferLater(self._clock, self._turn_delay, self._turn_deque))
175
176
177 class Uploader(QueueMixin):
178     def __init__(self, client, local_path_u, db, upload_dirnode, pending_delay, clock,
179                  immediate=False):
180         QueueMixin.__init__(self, client, local_path_u, db, 'uploader', clock)
181
182         self.is_ready = False
183         self._immediate = immediate
184
185         if not IDirectoryNode.providedBy(upload_dirnode):
186             raise AssertionError("The URI in '%s' does not refer to a directory."
187                                  % os.path.join('private', 'magic_folder_dircap'))
188         if upload_dirnode.is_unknown() or upload_dirnode.is_readonly():
189             raise AssertionError("The URI in '%s' is not a writecap to a directory."
190                                  % os.path.join('private', 'magic_folder_dircap'))
191
192         self._upload_dirnode = upload_dirnode
193         self._inotify = get_inotify_module()
194         self._notifier = self._inotify.INotify()
195         self._pending = set()  # of unicode relpaths
196
197         self._periodic_full_scan_duration = 10 * 60 # perform a full scan every 10 minutes
198
199         if hasattr(self._notifier, 'set_pending_delay'):
200             self._notifier.set_pending_delay(pending_delay)
201
202         # TODO: what about IN_MOVE_SELF and IN_UNMOUNT?
203         #
204         self.mask = ( self._inotify.IN_CREATE
205                     | self._inotify.IN_CLOSE_WRITE
206                     | self._inotify.IN_MOVED_TO
207                     | self._inotify.IN_MOVED_FROM
208                     | self._inotify.IN_DELETE
209                     | self._inotify.IN_ONLYDIR
210                     | IN_EXCL_UNLINK
211                     )
212         self._notifier.watch(self._local_filepath, mask=self.mask, callbacks=[self._notify],
213                              recursive=True)
214
215     def start_monitoring(self):
216         self._log("start_monitoring")
217         d = defer.succeed(None)
218         d.addCallback(lambda ign: self._notifier.startReading())
219         d.addCallback(lambda ign: self._count('dirs_monitored'))
220         d.addBoth(self._call_hook, 'started')
221         return d
222
223     def stop(self):
224         self._log("stop")
225         self._notifier.stopReading()
226         self._count('dirs_monitored', -1)
227         self.periodic_callid.cancel()
228         if hasattr(self._notifier, 'wait_until_stopped'):
229             d = self._notifier.wait_until_stopped()
230         else:
231             d = defer.succeed(None)
232         d.addCallback(lambda ign: self._lazy_tail)
233         return d
234
235     def start_uploading(self):
236         self._log("start_uploading")
237         self.is_ready = True
238
239         all_relpaths = self._db.get_all_relpaths()
240         self._log("all relpaths: %r" % (all_relpaths,))
241
242         for relpath_u in all_relpaths:
243             self._add_pending(relpath_u)
244
245         self._periodic_full_scan()
246         self._extend_queue_and_keep_going(self._pending)
247
248     def _extend_queue_and_keep_going(self, relpaths_u):
249         self._log("queueing %r" % (relpaths_u,))
250         self._deque.extend(relpaths_u)
251         self._count('objects_queued', len(relpaths_u))
252
253         if self.is_ready:
254             if self._immediate:  # for tests
255                 self._turn_deque()
256             else:
257                 self._clock.callLater(0, self._turn_deque)
258
259     def _periodic_full_scan(self):
260         self.periodic_callid = self._clock.callLater(self._periodic_full_scan_duration, self._periodic_full_scan)
261         self._full_scan()
262
263     def _full_scan(self):
264         print "FULL SCAN"
265         self._log("_pending %r" % (self._pending))
266         self._scan(u"")
267
268     def _add_pending(self, relpath_u):
269         if not magicpath.should_ignore_file(relpath_u):
270             self._pending.add(relpath_u)
271
272     def _scan(self, reldir_u):
273         # Scan a directory by (synchronously) adding the paths of all its children to self._pending.
274         # Note that this doesn't add them to the deque -- that will
275
276         self._log("scan %r" % (reldir_u,))
277         fp = self._get_filepath(reldir_u)
278         try:
279             children = listdir_filepath(fp)
280         except EnvironmentError:
281             raise Exception("WARNING: magic folder: permission denied on directory %s"
282                             % quote_filepath(fp))
283         except FilenameEncodingError:
284             raise Exception("WARNING: magic folder: could not list directory %s due to a filename encoding error"
285                             % quote_filepath(fp))
286
287         for child in children:
288             _assert(isinstance(child, unicode), child=child)
289             self._add_pending("%s/%s" % (reldir_u, child) if reldir_u != u"" else child)
290
291     def is_pending(self, relpath_u):
292         return relpath_u in self._pending
293
294     def _notify(self, opaque, path, events_mask):
295         self._log("inotify event %r, %r, %r\n" % (opaque, path, ', '.join(self._inotify.humanReadableMask(events_mask))))
296         relpath_u = self._get_relpath(path)
297
298         # We filter out IN_CREATE events not associated with a directory.
299         # Acting on IN_CREATE for files could cause us to read and upload
300         # a possibly-incomplete file before the application has closed it.
301         # There should always be an IN_CLOSE_WRITE after an IN_CREATE, I think.
302         # It isn't possible to avoid watching for IN_CREATE at all, because
303         # it is the only event notified for a directory creation.
304
305         if ((events_mask & self._inotify.IN_CREATE) != 0 and
306             (events_mask & self._inotify.IN_ISDIR) == 0):
307             self._log("ignoring event for %r (creation of non-directory)\n" % (relpath_u,))
308             return
309         if relpath_u in self._pending:
310             self._log("not queueing %r because it is already pending" % (relpath_u,))
311             return
312         if magicpath.should_ignore_file(relpath_u):
313             self._log("ignoring event for %r (ignorable path)" % (relpath_u,))
314             return
315
316         self._pending.add(relpath_u)
317         self._extend_queue_and_keep_going([relpath_u])
318
319     def _when_queue_is_empty(self):
320         return defer.succeed(None)
321
322     def _process(self, relpath_u):
323         # Uploader
324         self._log("_process(%r)" % (relpath_u,))
325         if relpath_u is None:
326             return
327         precondition(isinstance(relpath_u, unicode), relpath_u)
328         precondition(not relpath_u.endswith(u'/'), relpath_u)
329
330         d = defer.succeed(None)
331
332         def _maybe_upload(val, now=None):
333             if now is None:
334                 now = time.time()
335             fp = self._get_filepath(relpath_u)
336             pathinfo = get_pathinfo(unicode_from_filepath(fp))
337
338             self._log("about to remove %r from pending set %r" %
339                       (relpath_u, self._pending))
340             self._pending.remove(relpath_u)
341             encoded_path_u = magicpath.path2magic(relpath_u)
342
343             if not pathinfo.exists:
344                 # FIXME merge this with the 'isfile' case.
345                 self._log("notified object %s disappeared (this is normal)" % quote_filepath(fp))
346                 self._count('objects_disappeared')
347
348                 db_entry = self._db.get_db_entry(relpath_u)
349                 if db_entry is None:
350                     return None
351
352                 last_downloaded_timestamp = now  # is this correct?
353
354                 if is_new_file(pathinfo, db_entry):
355                     new_version = db_entry.version + 1
356                 else:
357                     self._log("Not uploading %r" % (relpath_u,))
358                     self._count('objects_not_uploaded')
359                     return
360
361                 metadata = { 'version': new_version,
362                              'deleted': True,
363                              'last_downloaded_timestamp': last_downloaded_timestamp }
364                 if db_entry.last_downloaded_uri is not None:
365                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
366
367                 empty_uploadable = Data("", self._client.convergence)
368                 d2 = self._upload_dirnode.add_file(encoded_path_u, empty_uploadable,
369                                                    metadata=metadata, overwrite=True)
370
371                 def _add_db_entry(filenode):
372                     filecap = filenode.get_uri()
373                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
374                     self._db.did_upload_version(relpath_u, new_version, filecap,
375                                                 last_downloaded_uri, last_downloaded_timestamp,
376                                                 pathinfo)
377                     self._count('files_uploaded')
378                 d2.addCallback(_add_db_entry)
379                 return d2
380             elif pathinfo.islink:
381                 self.warn("WARNING: cannot upload symlink %s" % quote_filepath(fp))
382                 return None
383             elif pathinfo.isdir:
384                 if not getattr(self._notifier, 'recursive_includes_new_subdirectories', False):
385                     self._notifier.watch(fp, mask=self.mask, callbacks=[self._notify], recursive=True)
386
387                 uploadable = Data("", self._client.convergence)
388                 encoded_path_u += magicpath.path2magic(u"/")
389                 self._log("encoded_path_u =  %r" % (encoded_path_u,))
390                 upload_d = self._upload_dirnode.add_file(encoded_path_u, uploadable, metadata={"version":0}, overwrite=True)
391                 def _dir_succeeded(ign):
392                     self._log("created subdirectory %r" % (relpath_u,))
393                     self._count('directories_created')
394                 def _dir_failed(f):
395                     self._log("failed to create subdirectory %r" % (relpath_u,))
396                     return f
397                 upload_d.addCallbacks(_dir_succeeded, _dir_failed)
398                 upload_d.addCallback(lambda ign: self._scan(relpath_u))
399                 upload_d.addCallback(lambda ign: self._extend_queue_and_keep_going(self._pending))
400                 return upload_d
401             elif pathinfo.isfile:
402                 db_entry = self._db.get_db_entry(relpath_u)
403
404                 last_downloaded_timestamp = now
405
406                 if db_entry is None:
407                     new_version = 0
408                 elif is_new_file(pathinfo, db_entry):
409                     new_version = db_entry.version + 1
410                 else:
411                     self._log("Not uploading %r" % (relpath_u,))
412                     self._count('objects_not_uploaded')
413                     return None
414
415                 metadata = { 'version': new_version,
416                              'last_downloaded_timestamp': last_downloaded_timestamp }
417                 if db_entry is not None and db_entry.last_downloaded_uri is not None:
418                     metadata['last_downloaded_uri'] = db_entry.last_downloaded_uri
419
420                 uploadable = FileName(unicode_from_filepath(fp), self._client.convergence)
421                 d2 = self._upload_dirnode.add_file(encoded_path_u, uploadable,
422                                                    metadata=metadata, overwrite=True)
423
424                 def _add_db_entry(filenode):
425                     filecap = filenode.get_uri()
426                     last_downloaded_uri = metadata.get('last_downloaded_uri', None)
427                     self._db.did_upload_version(relpath_u, new_version, filecap,
428                                                 last_downloaded_uri, last_downloaded_timestamp,
429                                                 pathinfo)
430                     self._count('files_uploaded')
431                 d2.addCallback(_add_db_entry)
432                 return d2
433             else:
434                 self.warn("WARNING: cannot process special file %s" % quote_filepath(fp))
435                 return None
436
437         d.addCallback(_maybe_upload)
438
439         def _succeeded(res):
440             self._count('objects_succeeded')
441             return res
442         def _failed(f):
443             self._count('objects_failed')
444             self._log("%s while processing %r" % (f, relpath_u))
445             return f
446         d.addCallbacks(_succeeded, _failed)
447         return d
448
449     def _get_metadata(self, encoded_path_u):
450         try:
451             d = self._upload_dirnode.get_metadata_for(encoded_path_u)
452         except KeyError:
453             return Failure()
454         return d
455
456     def _get_filenode(self, encoded_path_u):
457         try:
458             d = self._upload_dirnode.get(encoded_path_u)
459         except KeyError:
460             return Failure()
461         return d
462
463
464 class WriteFileMixin(object):
465     FUDGE_SECONDS = 10.0
466
467     def _get_conflicted_filename(self, abspath_u):
468         return abspath_u + u".conflict"
469
470     def _write_downloaded_file(self, abspath_u, file_contents, is_conflict=False, now=None):
471         self._log("_write_downloaded_file(%r, <%d bytes>, is_conflict=%r, now=%r)"
472                   % (abspath_u, len(file_contents), is_conflict, now))
473
474         # 1. Write a temporary file, say .foo.tmp.
475         # 2. is_conflict determines whether this is an overwrite or a conflict.
476         # 3. Set the mtime of the replacement file to be T seconds before the
477         #    current local time.
478         # 4. Perform a file replacement with backup filename foo.backup,
479         #    replaced file foo, and replacement file .foo.tmp. If any step of
480         #    this operation fails, reclassify as a conflict and stop.
481         #
482         # Returns the path of the destination file.
483
484         precondition_abspath(abspath_u)
485         replacement_path_u = abspath_u + u".tmp"  # FIXME more unique
486         backup_path_u = abspath_u + u".backup"
487         if now is None:
488             now = time.time()
489
490         # ensure parent directory exists
491         head, tail = os.path.split(abspath_u)
492
493         old_mask = os.umask(self._umask)
494         try:
495             fileutil.make_dirs(head, (~ self._umask) & 0777)
496             fileutil.write(replacement_path_u, file_contents)
497         finally:
498             os.umask(old_mask)
499
500         os.utime(replacement_path_u, (now, now - self.FUDGE_SECONDS))
501         if is_conflict:
502             print "0x00 ------------ <><> is conflict; calling _rename_conflicted_file... %r %r" % (abspath_u, replacement_path_u)
503             return self._rename_conflicted_file(abspath_u, replacement_path_u)
504         else:
505             try:
506                 fileutil.replace_file(abspath_u, replacement_path_u, backup_path_u)
507                 return abspath_u
508             except fileutil.ConflictError:
509                 return self._rename_conflicted_file(abspath_u, replacement_path_u)
510
511     def _rename_conflicted_file(self, abspath_u, replacement_path_u):
512         self._log("_rename_conflicted_file(%r, %r)" % (abspath_u, replacement_path_u))
513
514         conflict_path_u = self._get_conflicted_filename(abspath_u)
515         print "XXX rename %r %r" % (replacement_path_u, conflict_path_u)
516         if os.path.isfile(replacement_path_u):
517             print "%r exists" % (replacement_path_u,)
518         if os.path.isfile(conflict_path_u):
519             print "%r exists" % (conflict_path_u,)
520
521         fileutil.rename_no_overwrite(replacement_path_u, conflict_path_u)
522         return conflict_path_u
523
524     def _rename_deleted_file(self, abspath_u):
525         self._log('renaming deleted file to backup: %s' % (abspath_u,))
526         try:
527             fileutil.rename_no_overwrite(abspath_u, abspath_u + u'.backup')
528         except OSError:
529             self._log("Already gone: '%s'" % (abspath_u,))
530         return abspath_u
531
532
533 class Downloader(QueueMixin, WriteFileMixin):
534     REMOTE_SCAN_INTERVAL = 3  # facilitates tests
535
536     def __init__(self, client, local_path_u, db, collective_dirnode,
537                  upload_readonly_dircap, clock, is_upload_pending, umask):
538         QueueMixin.__init__(self, client, local_path_u, db, 'downloader', clock)
539
540         if not IDirectoryNode.providedBy(collective_dirnode):
541             raise AssertionError("The URI in '%s' does not refer to a directory."
542                                  % os.path.join('private', 'collective_dircap'))
543         if collective_dirnode.is_unknown() or not collective_dirnode.is_readonly():
544             raise AssertionError("The URI in '%s' is not a readonly cap to a directory."
545                                  % os.path.join('private', 'collective_dircap'))
546
547         self._collective_dirnode = collective_dirnode
548         self._upload_readonly_dircap = upload_readonly_dircap
549         self._is_upload_pending = is_upload_pending
550         self._umask = umask
551
552     def start_downloading(self):
553         self._log("start_downloading")
554         files = self._db.get_all_relpaths()
555         self._log("all files %s" % files)
556
557         d = self._scan_remote_collective(scan_self=True)
558         d.addBoth(self._logcb, "after _scan_remote_collective 0")
559         self._turn_deque()
560         return d
561
562     def stop(self):
563         self._stopped = True
564         d = defer.succeed(None)
565         d.addCallback(lambda ign: self._lazy_tail)
566         return d
567
568     def _should_download(self, relpath_u, remote_version):
569         """
570         _should_download returns a bool indicating whether or not a remote object should be downloaded.
571         We check the remote metadata version against our magic-folder db version number;
572         latest version wins.
573         """
574         self._log("_should_download(%r, %r)" % (relpath_u, remote_version))
575         if magicpath.should_ignore_file(relpath_u):
576             self._log("nope")
577             return False
578         self._log("yep")
579         db_entry = self._db.get_db_entry(relpath_u)
580         if db_entry is None:
581             return True
582         self._log("version %r" % (db_entry.version,))
583         return (db_entry.version < remote_version)
584
585     def _get_local_latest(self, relpath_u):
586         """
587         _get_local_latest takes a unicode path string checks to see if this file object
588         exists in our magic-folder db; if not then return None
589         else check for an entry in our magic-folder db and return the version number.
590         """
591         if not self._get_filepath(relpath_u).exists():
592             return None
593         db_entry = self._db.get_db_entry(relpath_u)
594         return None if db_entry is None else db_entry.version
595
596     def _get_collective_latest_file(self, filename):
597         """
598         _get_collective_latest_file takes a file path pointing to a file managed by
599         magic-folder and returns a deferred that fires with the two tuple containing a
600         file node and metadata for the latest version of the file located in the
601         magic-folder collective directory.
602         """
603         collective_dirmap_d = self._collective_dirnode.list()
604         def scan_collective(result):
605             list_of_deferreds = []
606             for dir_name in result.keys():
607                 # XXX make sure it's a directory
608                 d = defer.succeed(None)
609                 d.addCallback(lambda x, dir_name=dir_name: result[dir_name][0].get_child_and_metadata(filename))
610                 list_of_deferreds.append(d)
611             deferList = defer.DeferredList(list_of_deferreds, consumeErrors=True)
612             return deferList
613         collective_dirmap_d.addCallback(scan_collective)
614         def highest_version(deferredList):
615             max_version = 0
616             metadata = None
617             node = None
618             for success, result in deferredList:
619                 if success:
620                     if result[1]['version'] > max_version:
621                         node, metadata = result
622                         max_version = result[1]['version']
623             return node, metadata
624         collective_dirmap_d.addCallback(highest_version)
625         return collective_dirmap_d
626
627     def _scan_remote_dmd(self, nickname, dirnode, scan_batch):
628         self._log("_scan_remote_dmd nickname %r" % (nickname,))
629         d = dirnode.list()
630         def scan_listing(listing_map):
631             for encoded_relpath_u in listing_map.keys():
632                 relpath_u = magicpath.magic2path(encoded_relpath_u)
633                 self._log("found %r" % (relpath_u,))
634
635                 file_node, metadata = listing_map[encoded_relpath_u]
636                 local_version = self._get_local_latest(relpath_u)
637                 remote_version = metadata.get('version', None)
638                 self._log("%r has local version %r, remote version %r" % (relpath_u, local_version, remote_version))
639
640                 if local_version is None or remote_version is None or local_version < remote_version:
641                     self._log("%r added to download queue" % (relpath_u,))
642                     if scan_batch.has_key(relpath_u):
643                         scan_batch[relpath_u] += [(file_node, metadata)]
644                     else:
645                         scan_batch[relpath_u] = [(file_node, metadata)]
646
647         d.addCallback(scan_listing)
648         d.addBoth(self._logcb, "end of _scan_remote_dmd")
649         return d
650
651     def _scan_remote_collective(self, scan_self=False):
652         self._log("_scan_remote_collective")
653         scan_batch = {}  # path -> [(filenode, metadata)]
654
655         d = self._collective_dirnode.list()
656         def scan_collective(dirmap):
657             d2 = defer.succeed(None)
658             for dir_name in dirmap:
659                 (dirnode, metadata) = dirmap[dir_name]
660                 if scan_self or dirnode.get_readonly_uri() != self._upload_readonly_dircap:
661                     d2.addCallback(lambda ign, dir_name=dir_name, dirnode=dirnode:
662                                    self._scan_remote_dmd(dir_name, dirnode, scan_batch))
663                     def _err(f, dir_name=dir_name):
664                         self._log("failed to scan DMD for client %r: %s" % (dir_name, f))
665                         # XXX what should we do to make this failure more visible to users?
666                     d2.addErrback(_err)
667
668             return d2
669         d.addCallback(scan_collective)
670
671         def _filter_batch_to_deque(ign):
672             self._log("deque = %r, scan_batch = %r" % (self._deque, scan_batch))
673             for relpath_u in scan_batch.keys():
674                 file_node, metadata = max(scan_batch[relpath_u], key=lambda x: x[1]['version'])
675
676                 if self._should_download(relpath_u, metadata['version']):
677                     self._deque.append( (relpath_u, file_node, metadata) )
678                 else:
679                     self._log("Excluding %r" % (relpath_u,))
680                     self._call_hook(None, 'processed')
681
682             self._log("deque after = %r" % (self._deque,))
683         d.addCallback(_filter_batch_to_deque)
684         return d
685
686     def _when_queue_is_empty(self):
687         d = task.deferLater(self._clock, self.REMOTE_SCAN_INTERVAL, self._scan_remote_collective)
688         d.addBoth(self._logcb, "after _scan_remote_collective 1")
689         d.addCallback(lambda ign: self._turn_deque())
690         return d
691
692     def _process(self, item, now=None):
693         # Downloader
694         self._log("_process(%r)" % (item,))
695         if now is None:
696             now = time.time()
697         (relpath_u, file_node, metadata) = item
698         fp = self._get_filepath(relpath_u)
699         abspath_u = unicode_from_filepath(fp)
700         conflict_path_u = self._get_conflicted_filename(abspath_u)
701
702         d = defer.succeed(None)
703
704         def do_update_db(written_abspath_u):
705             filecap = file_node.get_uri()
706             last_uploaded_uri = metadata.get('last_uploaded_uri', None)
707             last_downloaded_uri = filecap
708             last_downloaded_timestamp = now
709             written_pathinfo = get_pathinfo(written_abspath_u)
710
711             if not written_pathinfo.exists and not metadata.get('deleted', False):
712                 raise Exception("downloaded object %s disappeared" % quote_local_unicode_path(written_abspath_u))
713
714             self._db.did_upload_version(relpath_u, metadata['version'], last_uploaded_uri,
715                                         last_downloaded_uri, last_downloaded_timestamp, written_pathinfo)
716             self._count('objects_downloaded')
717         def failed(f):
718             self._log("download failed: %s" % (str(f),))
719             self._count('objects_failed')
720             return f
721
722         if os.path.isfile(conflict_path_u):
723             def fail(res):
724                 raise ConflictError("download failed: already conflicted: %r" % (relpath_u,))
725             d.addCallback(fail)
726         else:
727             is_conflict = False
728             db_entry = self._db.get_db_entry(relpath_u)
729             dmd_last_downloaded_uri = metadata.get('last_downloaded_uri', None)
730             dmd_last_uploaded_uri = metadata.get('last_uploaded_uri', None)
731             if db_entry:
732                 if dmd_last_downloaded_uri is not None and db_entry.last_downloaded_uri is not None:
733                     if dmd_last_downloaded_uri != db_entry.last_downloaded_uri:
734                         is_conflict = True
735                         self._count('objects_conflicted')
736                 elif dmd_last_uploaded_uri is not None and dmd_last_uploaded_uri != db_entry.last_uploaded_uri:
737                     is_conflict = True
738                     self._count('objects_conflicted')
739                 elif self._is_upload_pending(relpath_u):
740                     is_conflict = True
741                     self._count('objects_conflicted')
742
743             if relpath_u.endswith(u"/"):
744                 if metadata.get('deleted', False):
745                     self._log("rmdir(%r) ignored" % (abspath_u,))
746                 else:
747                     self._log("mkdir(%r)" % (abspath_u,))
748                     d.addCallback(lambda ign: fileutil.make_dirs(abspath_u))
749                     d.addCallback(lambda ign: abspath_u)
750             else:
751                 if metadata.get('deleted', False):
752                     d.addCallback(lambda ign: self._rename_deleted_file(abspath_u))
753                 else:
754                     d.addCallback(lambda ign: file_node.download_best_version())
755                     d.addCallback(lambda contents: self._write_downloaded_file(abspath_u, contents,
756                                                                                is_conflict=is_conflict))
757
758         d.addCallbacks(do_update_db, failed)
759
760         def trap_conflicts(f):
761             f.trap(ConflictError)
762             return None
763         d.addErrback(trap_conflicts)
764         return d