#!/usr/bin/env python # # Author: Martin Matusiak # Licensed under the GNU Public License, version 3. # # Watch directory for changes to files being written import os import sys import time class Formatter(object): size_units = [' b', 'kb', 'mb', 'gb', 'tb', 'pb', 'eb', 'zb', 'yb'] time_units = ['sec', 'min', 'hou', 'day', 'mon', 'yea'] @classmethod def simplify_time(cls, tm): unit = 0 if tm > 59: unit += 1 tm = float(tm) / 60 if tm > 59: unit += 1 tm = float(tm) / 60 if tm > 23: unit += 1 tm = float(tm) / 24 if tm > 29: unit += 1 tm = float(tm) / 30 if tm > 11: unit += 1 tm = float(tm) / 12 return int(round(tm)), cls.time_units[unit] @classmethod def simplify_filesize(cls, size): unit = 0 while size > 1023: unit += 1 size = float(size) / 1024 return int(round(size)), cls.size_units[unit] @classmethod def mtime(cls, reftime, mtime): delta = int(reftime - mtime) tm, unit = cls.simplify_time(delta) delta_s = "%s%s" % (tm, unit) return delta_s @classmethod def filesize(cls, size): size, unit = cls.simplify_filesize(size) size_s = "%s%s" % (size, unit) return size_s @classmethod def filesizedelta(cls, size): size, unit = cls.simplify_filesize(size) sign = size > 0 and "+" or "" size_s = "%s%s%s" % (sign, size, unit) return size_s @classmethod def bold(cls, s): """Display in bold""" term = os.environ.get("TERM") if term and term != "dumb": return "\033[1m%s\033[0m" % s return s class File(object): sample_limit = 60 # don't hold more than x samples def __init__(self, file): self.file = file self.mtimes = [] def get_name(self): return self.file def get_last_mtime(self): tm, sz = self.mtimes[-1] return tm def get_last_size(self): tm, sz = self.mtimes[-1] return sz def get_last_delta(self): size_last = self.get_last_size() try: mtime_beforelast, size_beforelast = self.mtimes[-2] return size_last - size_beforelast except IndexError: return 0 def prune_samples(self): """Remove samples older than x samples back""" if len(self.mtimes) % self.sample_limit == 0: self.mtimes = self.mtimes[-self.sample_limit:] def sample(self, mtime, size): """Sample file status""" # Don't keep too many samples self.prune_samples() # Update time and size self.mtimes.append((mtime, size)) class Directory(object): def __init__(self, path): self.path = path self.files = {} def prune_files(self): """Remove indexed files no longer on disk (by deletion/rename)""" for f in self.files.values(): name = f.get_name() file = os.path.join(self.path, name) if not os.path.exists(file): del(self.files[name]) def scan_files(self): # remove duds first self.prune_files() # find items, grab only files items = os.listdir(self.path) items = filter(lambda f: os.path.isfile(os.path.join(self.path, f)), items) # stat files, building/updating index for f in items: st = os.stat(os.path.join(self.path, f)) if not self.files.get(f): self.files[f] = File(f) self.files[f].sample(st.st_mtime, st.st_size) def display_line(self, name, time_now, tm, size, sizedelta): time_fmt = Formatter.mtime(time_now, tm) size_fmt = Formatter.filesize(size) sizedelta_fmt = Formatter.filesizedelta(sizedelta) line = "%6.6s %5.5s %6.6s %s" % (time_fmt, size_fmt, sizedelta_fmt, name) if time_now - tm < 6: line = Formatter.bold(line) return line def sort_by_name(self, files): return sorted(self.files.values(), key=lambda x: x.get_name()) def sort_by_mtime(self, files): return sorted(self.files.values(), key=lambda x: (x.get_last_mtime(),x.get_name())) def display(self): time_now = time.time() files = self.sort_by_mtime(self.files.values()) print("\nmtime> size> delta> name>") for f in files: line = self.display_line(f.get_name(), time_now, f.get_last_mtime(), f.get_last_size(), f.get_last_delta()) print(line) def main(path): directory = Directory(path) while True: try: directory.scan_files() directory.display() time.sleep(1) except KeyboardInterrupt: print("\rUser terminated") return if __name__ == '__main__': try: path = sys.argv[1] except IndexError: print("Usage: %s /path" % sys.argv[0]) sys.exit(1) main(path)