import os
from operator import itemgetter
import general
P4 = None
try:
import P4
except:
# some tools not functional without p4python
pass
__all__ = [
'refresh',
'newInstance',
'is_valid_user',
'TempP4ExceptionLevel',
'get_p4_and_client_from_path',
'get_p4_from_path',
'get_client_data',
'get_client_from_path',
'check_path_in_client',
'find_client',
'get_depots',
'get_depot_paths',
'get_clients',
'get_clients_with_specs',
'get_child_dirs',
'get_child_files',
'build_changelist',
'is_path_tracked',
'is_file_tracked',
'is_dir_tracked',
]
DEFAULT_USER_VALIDATED = False
DEFAULT_LOGGED_IN = False
# Cache p4 instances and clients
P4_INSTANCES = {}
P4_DEPOTS = []
P4_CLIENTS = {}
P4_CLIENT_SPECS = {}
[docs]def refresh():
"""
Delete all cached p4 connections
"""
global P4_INSTANCE
global P4_INSTANCES
global P4_DEPOTS
global P4_CLIENTS
P4_INSTANCE = None
P4_INSTANCES = {}
P4_DEPOTS = []
P4_CLIENTS = {}
[docs]def newInstance(dialog=False):
global DEFAULT_USER_VALIDATED
global DEFAULT_LOGGED_IN
if P4 is None or not hasattr(P4, 'P4'):
raise ValueError("Couldn't find P4 or P4API")
p4 = P4.P4()
if not p4.connected():
p4.connect()
if not DEFAULT_LOGGED_IN:
if dialog:
import gui
gui.login(p4)
else:
try:
p4.run_login("-s")
except P4.P4Exception:
raise ValueError("Couldn't login to P4")
DEFAULT_LOGGED_IN = True
if not DEFAULT_USER_VALIDATED:
if not is_valid_user(p4):
raise ValueError("P4 User Doesn't Exist: {0}".format(p4.user))
DEFAULT_USER_VALIDATED = True
return p4
[docs]def is_valid_user(p4):
users = [x['User'] for x in p4.run_users()]
if p4.user in users:
return True
return False
[docs]class TempP4ExceptionLevel(object):
"""
Temporarily adjust the P4 exception level
Example:
>>> with TempP4ExceptionLevel(p4, 1):
>>> # do stuff
Args:
p4 (`P4.P4`): P4 instance
lvl (`int`): Exception Level
"""
def __init__(self, p4, lvl):
self.p4 = p4
self.lvl = lvl
self.origLvl = None
def __enter__(self):
self.origLvl = self.p4.exception_level
self.p4.exception_level = self.lvl
def __exit__(self, type, value, tb):
self.p4.exception_level = self.origLvl
[docs]def get_p4_and_client_from_path(path):
"""
Given a path, return a p4 instance and clientData if possible.
Will search within the cached instances, otherwise attempts
to find an appropriate client and returns the new data
"""
global P4_INSTANCES
# check root and stream paths of cached client
# data to see if this path is in one of them
for c, data in P4_INSTANCES.items():
root = data[1].get('Root')
depot = data[1].get('Stream')
if (root and general.path_contains(root, path))\
or (depot and general.path_contains(depot, path)):
return data
p4 = newInstance()
# If not logged in, no client can be found
try:
p4.run_login("-s")
except P4.P4Exception:
return p4, None
# Find associated clients
clientData = find_client(p4, path)
# If none are found, we're done
if not clientData:
data = (p4, None)
return p4, None
# check to see if we already have an instance
# with the same client data
client = clientData.get('client')
for c, data in P4_INSTANCES.items():
if c == client:
return data
# Build a new instance and store it
p4 = P4.P4()
p4.client = client
if not p4.connected():
p4.connect()
data = (p4, clientData)
P4_INSTANCES[client] = data
return data
[docs]def get_p4_from_path(path):
p4 = P4.P4()
if not p4.connected():
p4.connect()
try:
p4.run_login("-s")
except P4.P4Exception, e:
raise ValueError("Couldn't login to perforce: {0}".format(e))
client = get_client_from_path(p4, path)
if not client:
raise ValueError("Couldn't find client for path: {0}".format(path))
p4.client = client
if p4.client not in P4_INSTANCES:
P4_INSTANCES[p4.client] = p4
return p4
[docs]def get_client_data(p4):
if not p4.run_clients('-e', p4.client):
raise ValueError("Client Does Not Exist: {0}".format(p4.client))
result = p4.fetch_client(p4.client)
return result
[docs]def get_client_from_path(p4, path, clients=None, includeData=False):
if clients is None:
clients = p4.run_clients('-u', p4.user)
clients.sort(key=lambda c: c['Access'], reverse=True)
if not clients:
return
path = general.path_normalize(path).lower()
# First check for matching clients with matching host
_clients = filter(lambda c: c['Host'] == p4.host, clients)
for _client in _clients:
if check_path_in_client(p4, _client, path):
if includeData:
return _client
return _client['client']
# Search the rest of the clients that don't have a host set
_clients = filter(lambda c: c['Host'] == "", clients)
for _client in _clients:
if check_path_in_client(p4, _client, path):
if includeData:
return _client
return _client['client']
# No Matches
return
[docs]def check_path_in_client(p4, c, path, filterHost=True):
def contains(a, b):
return general.path_contains(a.lower(), b, normalize=False, normcase=False)
# Perforce Depot Path, only need to check root
if path[0:2] == "//":
if 'Stream' in c:
if contains(c['Stream'], path):
# LOG.debug("Client matched by Stream: {0[client]} {0[Stream]}".format(c), extra=dict(indent=1))
return True
# Match based on the roots of the mappings
# Ex:
# //depot/path/... //client/...
# ^^^^^^^^^^^^^^^^
# Lazy load the view data since it requires an additional p4 call
view = c.get('View', None)
if view is None:
_client = p4.fetch_client(c['client'])
c.update(_client)
view = c.get('View', None)
for mapping in c.get('View', []):
root = mapping.split(' ')[0]
if root.startswith('-'):
continue
if root.endswith('...'):
root = root[:-3]
if contains(root, path):
# LOG.debug("Client matched by view mapping root: {0} {1}".format(root, path))
return True
return False
else:
if contains(c['Root'], path):
# LOG.debug("Client matched by Root: {0[client]} {0[Root]}".format(c), extra=dict(indent=1))
return True
for alt in c.get('AltRoots', []):
if contains(alt, path):
# LOG.debug("Client matched by AltRoot: {0[client]} {1}".format(c, alt), extra=dict(indent=1))
return True
return False
[docs]def find_client(p4, path, host=None):
"""
Find a client by root path
"""
if not p4.connected():
raise ValueError("Perforce is not connected")
# determine filter values
if host is None:
host = p4.host.lower()
# find matches
def clientContainsPath(c, path):
_host = c.get('Host', None)
if _host and _host.lower() != host:
return False
if general.path_contains(c['Root'], path):
return True
if 'Stream' in c:
if general.path_contains(c['Stream'], path):
return True
for alt in c.get('AltRoots', []):
if general.path_contains(alt, path):
return True
# Match based on the roots of the mappings
# Ex:
# //depot/path/... //client/...
# ^^^^^^^^^^^^^^^^
for mapping in c.get('View', []):
root = mapping.split(' ')[0]
if root.startswith('-'):
continue
if root.endswith('...'):
root = root[:-3]
if general.path_contains(root, path):
return True
return False
clients = get_clients(p4)
matches = [c for c in clients if clientContainsPath(c, path)]
if not len(matches):
# Check based on the client spec
clients = get_clients_with_specs(p4, clients=clients, ignoreStreams=True)
matches = [c for c in clients if clientContainsPath(c, path)]
# check for explicitly set hostnames
if len(matches) > 1:
import socket
hostname = socket.gethostname()
_matches = filter(lambda m: m['Host'] == hostname, matches)
if len(_matches):
matches = _matches
# check results
if len(matches) > 1:
matches.sort(key=itemgetter('Access'))
elif not len(matches):
return
return matches[0]
[docs]def get_depots(p4=None, refresh=False):
"""
Return a list of all depots
"""
global P4_DEPOTS
if not P4_DEPOTS or refresh:
if p4 is None:
p4 = newInstance()
P4_DEPOTS = p4.run_depots()
return P4_DEPOTS
[docs]def get_depot_paths(p4=None, refresh=False):
"""
From a p4 instance, get a list of all depot paths available
Args:
p4 (P4): perforce instance
Returns:
list of str: Depot paths in the format of
['//{depotName}', ...]
"""
depots = get_depots(p4, refresh)
return ['//{0[name]}'.format(d) for d in depots]
[docs]def get_clients(p4=None, refresh=False):
"""
Return a list of all clients
"""
if p4 is None:
p4 = newInstance()
if p4.user not in P4_CLIENTS or refresh:
P4_CLIENTS[p4.user] = p4.run_clients('-u', p4.user)
return P4_CLIENTS[p4.user]
[docs]def get_clients_with_specs(p4=None, clients=None, ignoreStreams=False, useCache=True):
"""
Get a list of clients with their full specs
This contains the view mappings
"""
if p4 is None:
p4 = newInstance()
result = []
if clients is None:
clients = get_clients(p4)
clientsToMatch = []
if useCache:
for client in clients:
clientName = client['client']
if clientName in P4_CLIENT_SPECS:
result.append(P4_CLIENT_SPECS[clientName])
else:
clientsToMatch.append(client)
else:
clientsToMatch = clients[:]
for i, client in enumerate(clientsToMatch):
clientName = client['client']
if ignoreStreams and client.get('Stream', None):
continue
data = p4.run_client('-o', client['client'])[-1]
newData = dict(client.items())
newData.update(data)
P4_CLIENT_SPECS[clientName] = newData
result.append(newData)
return result
[docs]def get_child_dirs(p4, path, **kwargs):
"""
Return all child directories of the given path as strings
Kwargs are passed to filter_item
"""
searchPath = "{0}/*".format(path)
result = []
try:
dirsResult = p4.run_dirs(searchPath)
except P4.P4Exception:
dirsResult = []
for r in dirsResult:
path = general.path_normalize(r['dir'])
basename = os.path.basename(path)
if general.filter_item(basename, **kwargs):
result.append(path)
return result
[docs]def get_child_files(p4, path, includeDeleted=False, **kwargs):
"""
Return all perforce files inside the given directory as strings
"""
searchPath = "{0}/*".format(path)
result = []
try:
filesResult = p4.run_files(searchPath)
except P4.P4Exception:
filesResult = []
for f in filesResult:
deleted = f.get('action', '').endswith('delete')
if not deleted or includeDeleted:
path = general.path_normalize(f['depotFile'])
basename = os.path.basename(path)
if general.filter_item(basename, **kwargs):
result.append(path)
return result
[docs]def build_changelist(p4, files, description):
change = p4.fetch_change()
change._files = files
change._description = description
return change
[docs]def is_path_tracked(path, p4=None):
if p4 is None:
p4 = get_p4_from_path(path)
return is_file_tracked(path, p4) or is_dir_tracked(path, p4)
[docs]def is_file_tracked(path, p4=None):
"""
Return True if the given file is currently tracked in perforce
"""
if p4 is None:
p4 = get_p4_from_path(path)
try:
info = p4.run_files(path)
except P4.P4Exception:
return False
else:
if len(p4.errors):
return False
if len(info):
if info[0]['action'] == 'delete':
# File was deleted, and thus has to be readded
return False
return True
return False
[docs]def is_dir_tracked(path, p4=None):
""" Return True if the given directory is currently tracked in perforce """
if p4 is None:
p4 = get_p4_from_path(path)
try:
info = p4.run_dirs(path)
except P4.P4Exception:
return False
else:
if len(p4.errors):
return False
return bool(len(info))