import os
import perforce
import general
__all__ = [
'FilestructurePath',
'PerforcePath',
'DiskPath',
]
[docs]class FilestructurePath(object):
def __init__(self, path):
self._path = general.path_normalize(path)
self._type = None
@classmethod
[docs] def from_path(cls, path, *args, **kwargs):
type = cls.find_type(path, *args, **kwargs)
if type:
instance = type(path, *args, **kwargs)
else:
instance = cls(path)
return instance
@classmethod
[docs] def find_type(cls, path, *args, **kwargs):
types = FilestructurePath.__subclasses__()
types.sort(key=lambda x: x.priority, reverse=True)
type = None
for _type in types:
try:
_type(path, *args, **kwargs)
except Exception:
continue
else:
type = _type
break
return type
@property
def path(self):
return self._path
[docs] def parent(self):
return os.path.dirname(self.path)
[docs] def name(self):
return os.path.basename(self.path)
@property
def type(self):
return self._type
[docs]class PerforcePath(FilestructurePath):
priority = 40
_whereKeys = {
'clientFile': True,
'path': True,
'depotFile': True,
}
_fileinfoKeys = {
'rev': True,
'time': True,
'actio': True,
'type': True,
'depotFile': True,
'change': True
}
_statsKeys = {
'clientFile': True,
'path': True,
'depotFile': True,
'headType': False,
'headRev': False,
'headChange': False,
'headTime': False,
'headModTime': False,
'headAction': False,
'isMapped': False,
'haveRev': False,
'otherOpen': False,
'change': False,
}
def __init__(self, path, *args, **kwargs):
super(PerforcePath, self).__init__(path)
self._p4 = kwargs.pop('p4', None)
self._clientData = kwargs.pop('clientData', None)
self._where = kwargs.pop('where', None)
self._client = None
self._perforce_root = None
self._data = {}
self._loaded_cmds = set()
self._input_path = path
if path == '/':
raise Exception("Can't create a Perforce instance from your system root path: {0}".format(path))
if path[0:2] == "//":
self._data['depotFile'] = general.path_normalize(path)
else:
self._data['path'] = general.path_normalize(path)
if kwargs.get('validate', True):
self.validate()
@property
def p4(self):
return self._p4
@p4.setter
def p4(self, value):
self._p4 = value
@property
def local_path(self):
if 'path' in self._data:
return self._data['path']
return self.where.get('path')
@property
def depot_path(self):
if 'depotFile' in self._data:
return self._data['depotFile']
return self.where.get('depotFile')
[docs] def tracked(self):
self.refresh()
if self.stats and 'headRev' in self.stats:
return True
return False
[docs] def get_stats(self):
"""
Get Perforces stats (fstat) for this Perforce path
This always retrieves the latest information, no caching
"""
result = {}
if self.p4.client:
with perforce.TempP4ExceptionLevel(self.p4, 1):
stats = self.p4.run_fstat("-Op", self._input_path)
if stats:
result = stats[0]
return result
@property
def stats(self):
"""
Get Perforces stats (fstat) for this Perforce path
Uses caching
"""
result = {}
cached = True
for k in self._statsKeys:
if k not in self._data:
if self._statsKeys[k]:
cached = False
break
else:
result[k] = self._data[k]
if cached or 'stats' in self._loaded_cmds:
return result
stats = self.get_stats()
self._data.update(stats)
self._loaded_cmds.add('stats')
return stats
@stats.setter
def stats(self, value):
if value is None:
# Clear cache
for k in self._statsKeys:
if k in self._data:
del self._data[k]
if 'stats' in self._loaded_cmds:
self._loaded_cmds.remove('stats')
return
if not isinstance(value, dict):
raise TypeError("Expected dict or None for stats, got {0}".format(value))
self._data.update(value)
[docs] def get_revisions(self):
"""
Get Perforces revsions (filelog) for this Perforce path
This always retrieves the latest information, no caching
"""
if self.clientData and self.tracked:
try:
result = self.p4.run_filelog('-L', self.path)
except perforce.P4.P4Exception:
pass
else:
return sorted(result[0].each_revision(), key=lambda r: r.rev)
return []
@property
def revisions(self):
"""
Get Perforces revisions (filelog) for this Perforce path
Uses caching
"""
result = {}
result = self._data.get('revisions', None)
if result is not None:
return result
revisions = self.get_revisions()
self._data['revisions'] = revisions
self._loaded_cmds.add('revisions')
return self._data['revisions']
@revisions.setter
def revisions(self, value):
if value is None:
# Clear cache
if 'revisions' in self._data:
del self._data['revisions']
if 'revisions' in self._loaded_cmds:
self._loaded_cmds.remove('revisions')
return
self._data['revisions'] = value
self._loaded_cmds.add('revisions')
[docs] def get_client_data(self):
try:
return perforce.get_client_data(self._p4)
except ValueError:
pass
except Exception:
pass
return {}
@property
def clientData(self):
if not self._clientData:
self._clientData = self.get_client_data()
return self._clientData
[docs] def get_where(self):
"""
Get Perforces where (where) for this Perforce path
No caching
"""
with perforce.TempP4ExceptionLevel(self.p4, 0):
# "a" allows for matching roots
where = self.p4.run_where(general.join_paths(self._input_path, '_A_'))
if not where:
return {}
where = where[0]
for k, v in where.items():
if v.endswith('_A_'):
where[k] = os.path.dirname(v)
return where
@property
def where(self):
"""
Get Perforces where (where) for this Perforce path
Uses caching
"""
result = {}
cached = True
for k in self._whereKeys:
if k not in self._data:
if self._whereKeys[k]:
cached = False
break
else:
result[k] = self._data[k]
if cached or 'where' in self._loaded_cmds:
return result
where = self.get_where()
self._data.update(where)
self._loaded_cmds.add('where')
return where
@where.setter
def where(self, value):
if value is None:
for k in self._whereKeys:
if k in self._data:
del self._data[k]
if 'where' in self._loaded_cmds:
self._loaded_cmds.remove('where')
return
if not isinstance(value, dict):
raise TypeError("Expected dict or None for where, got {0}".format(value))
self._data.update(value)
[docs] def get_fileinfo(self):
"""
Get Perforces fileinfo (file) for this Perforce path
No caching
"""
with perforce.TempP4ExceptionLevel(self.p4, 1):
result = self.p4.run_files(self._input_path)
if not result:
return {}
return result[0]
@property
def fileinfo(self):
"""
Get Perforces fileinfo (file) for this Perforce path
Uses caching
"""
result = {}
cached = True
for k in self._fileinfoKeys:
if k not in self._data:
if self._fileinfoKeys[k]:
cached = False
break
else:
result[k] = self._data[k]
if cached or 'fileinfo' in self._loaded_cmds:
return result
fileinfo = self.get_fileinfo()
self._data.update(fileinfo)
self._loaded_cmds.add('fileinfo')
return fileinfo
@fileinfo.setter
def fileinfo(self, value):
if value is None:
for k in self._fileinfoKeys:
if k in self._data:
del self._data[k]
if 'fileinfo' in self._loaded_cmds:
self._loaded_cmds.remove('fileinfo')
return
if not isinstance(value, dict):
raise TypeError("Expected dict or None for fileinfo, got {0}".format(value))
self._data.update(value)
[docs] def item_cmp(self, a, b):
"""
Return the comparison for two items.
Sort by isfile/isdir and then by name
"""
t = -cmp(a.isdir, b.isdir)
if t == 0:
return cmp(a.name, b.name)
else:
return t
[docs] def editable(self):
if os.path.exists(self.local_path):
return os.access(self.local_path, os.W_OK)
else:
dirpath = os.path.dirname(self.local_path)
if os.path.exists(dirpath):
return os.access(dirpath, os.W_OK | os.X_OK)
return False
[docs] def deleted(self):
self.refresh()
if self.stats and self.stats.get('headAction', None) == 'delete':
return True
return False
[docs] def latest(self):
if self.clientData and self.tracked and not self.deleted:
if self.revision == max([r.rev for r in self.revisions]):
return True
else:
return False
return True
[docs] def checkedOut(self):
if self.clientData:
result = self.p4.run_opened(self.path)
if result:
return True
return False
@property
def different(self):
if self.revision == 0:
raise ValueError("Can't check if unsynced files have changed")
try:
changes = self.p4.run_diff('-f', self.local_path + '#{0}'.format(self.revision))
except:
return True
if not changes:
return False
if len(changes) > 1:
return True
return False
@property
def revision(self):
return int(self.stats.get('haveRev', 0))
@property
def next_revision(self):
return max([r.rev for r in self.revisions] if self.revisions else [0]) + 1
[docs] def exists(self):
return perforce.is_path_tracked(self.path, p4=self.p4)
[docs] def refresh(self):
if self.isfile():
self._loaded_cmds.clear()
# Clear cache
self.stats = None
self.revisions = None
else:
if self._children is not None:
for c in self._children:
c.refresh()
[docs] def sync(self, revision=None, *args, **kwargs):
rev = ''
result = None
if revision is not None:
try:
rev = '#{0}'.format(int(str(revision).lstrip('#')))
except ValueError:
raise ValueError('invalid revision number: {0}'.format(revision))
try:
if self.isfile():
query = '{0}{1}'.format(self.path, rev)
else:
query = '{0}/...{1}'.format(self.path, rev)
result = self.p4.run_sync(*(list(args) + [query]), **kwargs)
self.refresh()
except perforce.P4.P4Exception, e:
if "file(s) up-to-date" in str(e):
pass
else:
raise
else:
self.refresh()
return result
[docs] def remove(self):
self.sync(revision=0)
[docs] def checkout(self):
if self.isfile:
result = self.p4.run_edit(self.path)
else:
path = self.path
if not path.endswith('/'):
path += '/'
path += '...'
result = self.p4.run_edit(path)
return result
[docs] def revert(self):
if self.isfile:
result = self.p4.run_revert(self.path)
else:
path = self.path
if not path.endswith('/'):
path += '/'
path += '...'
result = self.p4.run_revert(path)
return result
[docs] def revert_unchanged(self):
"""Reverts unchanged file"""
try:
different = self.different
except ValueError:
return True
if different:
return False
self.revert()
return True
[docs] def submit(self, description=None, force=False):
result = []
if description is None:
if self.description is not None:
description = self.description
else:
if self.tracked:
description = 'Updated file {0}'.format(self.perforce_path)
else:
description = 'Added file {0}'.format(self.perforce_path)
self.stats = None
if self.stats.get('change', None):
changeID = self.stats['change']
if changeID != 'default' and not force:
raise ValueError("File is already in changelist: {0}".format(changeID))
# Create changelist
change = perforce.build_changelist(self.p4, [self.path], description)
# Save changelist
change = self.p4.save_change(change)
# Get changelist id
changeID = change[0].split()[1]
# Submit via id
args = ['-c', changeID]
result = self.p4.run_submit(*args)
num = result[-1]['submittedChange']
self.refresh()
return self.p4.fetch_change(num)
[docs] def get_children(self, includeDirs=True, includeFiles=True):
if not self.isdir():
return None
def wrap_instances(cls, paths):
instances = []
for path in paths:
instances.append(cls.__class__(path))
return instances
p4items = []
kwargs = dict(
p4=self.p4,
path=self.path,
)
if includeDirs:
paths = perforce.get_child_dirs(**kwargs)
p4items.extend(wrap_instances(self, paths))
if includeFiles:
paths = perforce.get_child_files(**kwargs)
p4items.extend(wrap_instances(self, paths))
# supplement with disk children
diskItems = []
if self.exists:
kwargs = dict(
includeFiles=includeFiles,
includeDirs=includeDirs,
)
diskItems = general.get_folder_contents(self.local_path, **kwargs)
p4itemNames = [i.name for i in p4items]
new = [i for i in diskItems if os.path.basename(i) not in p4itemNames]
diskItems = wrap_instances(self, new)
# combine and sort
result = p4items + diskItems
result.sort(cmp=self.item_cmp)
# assign indices to children
for i, r in enumerate(result):
r.parent = self
r.index = i
return result
[docs] def read(self, forceP4=False):
if not self.isfile():
return None
if not forceP4:
self.sync()
if forceP4 or not self.exists:
result = self.p4.run_print('-q', self.path)
if isinstance(result, list):
r = result[1]
if isinstance(r, bytearray):
return r.decode('utf-8')
else:
return r
else:
return result
elif self.exists:
try:
with open(self.local_path, 'rb') as fp:
contents = fp.read()
except IOError:
pass
else:
return contents
return None
[docs] def isfile(self):
if not self.exists():
return False
with perforce.TempP4ExceptionLevel(self.p4, 0):
result = self.p4.run_fstat(self.path)
if result:
return True
return False
[docs] def isdir(self):
if not self.exists():
return False
with perforce.TempP4ExceptionLevel(self.p4, 0):
result = self.p4.run_fstat(self.path)
if result:
return False
return True
[docs] def validate(self):
if self.p4 is None:
self.p4 = perforce.get_p4_from_path(self.path)
client = perforce.get_client_from_path(self.p4, self.path)
self.p4.client = client
if not client:
raise ValueError("Couldn't find client for path: {0}".format(self.path))
[docs]class DiskPath(FilestructurePath):
priority = 20
def __init__(self, path):
super(DiskPath, self).__init__(path)
self.validate()
@property
def local_path(self):
return self.path
[docs] def exists(self):
return os.path.exists(self.path)
[docs] def editable(self):
return os.access(self.path, os.W_OK | os.X_OK)
[docs] def read(self):
with open(self.path, 'r') as fp:
data = fp.read()
return data
[docs] def get_children(self):
if self.isfile():
return None
def wrap_instances(cls, paths):
instances = []
for path in paths:
instances.append(cls.__class__(path))
return instances
files = general.get_folder_contents(self.path)
result = wrap_instances(self, files)
return result
[docs] def isfile(self):
return os.path.isfile(self.path)
[docs] def isdir(self):
return os.path.isdir(self.path)
[docs] def validate(self):
if not os.path.exists(self.path):
raise ValueError("Path does not exist on disk")