Skip to content

Commit

Permalink
Removed getxattr in favor of statx returning all xattrs
Browse files Browse the repository at this point in the history
The test has been adapted to require xattrs in the stat output
  • Loading branch information
glpatcern committed Aug 10, 2024
1 parent 3198389 commit a8217ef
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 77 deletions.
62 changes: 9 additions & 53 deletions src/core/cs3iface.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ def init(inconfig, inlog):
ctx['lockasattr'] = inconfig.getboolean('cs3', 'lockasattr', fallback=False)
ctx['locknotimpl'] = False
ctx['revagateway'] = inconfig.get('cs3', 'revagateway')
ctx['xattrcache'] = {} # this is a map cs3ref -> arbitrary_metadata as returned by Stat()
ctx['grpc_timeout'] = inconfig.getint('cs3', "grpctimeout", fallback=10)
ctx['http_timeout'] = inconfig.getint('cs3', "httptimeout", fallback=10)
# prepare the gRPC channel and validate that the revagateway gRPC server is ready
Expand Down Expand Up @@ -97,11 +96,6 @@ def _getcs3reference(endpoint, fileref):
return ref


def _hashedref(endpoint, fileref):
'''Returns an hashable key for the given endpoint and file reference'''
return str(endpoint) + str(fileref)


def authenticate_for_test(userid, userpwd):
'''Use basic authentication against Reva for testing purposes'''
authReq = cs3gw.AuthenticateRequest(type='basic', client_id=userid, client_secret=userpwd)
Expand Down Expand Up @@ -148,14 +142,14 @@ def stat(endpoint, fileref, userid, versioninv=1):
log.info('msg="Invoked stat" fileref="%s" trace="%s" inode="%s" filepath="%s" elapsedTimems="%.1f"' %
(fileref, statInfo.status.trace, inode, filepath, (tend-tstart)*1000))
# cache the xattrs map prior to returning; note we're never cleaning this cache and let it grow indefinitely
ctx['xattrcache'][_hashedref(endpoint, fileref)] = statInfo.info.arbitrary_metadata.metadata
return {
'inode': inode,
'filepath': filepath,
'ownerid': statInfo.info.owner.opaque_id + '@' + statInfo.info.owner.idp,
'size': statInfo.info.size,
'mtime': statInfo.info.mtime.seconds,
'etag': statInfo.info.etag,
'xattrs': statInfo.info.arbitrary_metadata.metadata
}


Expand All @@ -169,11 +163,6 @@ def setxattr(endpoint, filepath, userid, key, value, lockmd):
ref = _getcs3reference(endpoint, filepath)
md = cs3spr.ArbitraryMetadata()
md.metadata.update({key: str(value)}) # pylint: disable=no-member
try:
ctx['xattrcache'][_hashedref(endpoint, filepath)][key] = str(value)
except KeyError:
# we did not have this file in the cache, ignore
pass
lockid = None
if lockmd:
_, lockid = lockmd
Expand All @@ -192,38 +181,6 @@ def setxattr(endpoint, filepath, userid, key, value, lockmd):
log.debug(f'msg="Invoked setxattr" result="{res}"')


def getxattr(endpoint, filepath, userid, key):
'''Get the extended attribute <key> using the given userid as access token'''
ref = _getcs3reference(endpoint, filepath)
statInfo = None
href = _hashedref(endpoint, filepath)
if href not in ctx['xattrcache']:
# cache miss, go for Stat and refresh cache
tstart = time.time()
statInfo = ctx['cs3gw'].Stat(request=cs3sp.StatRequest(ref=ref), metadata=[('x-access-token', userid)])
tend = time.time()
if statInfo.status.code == cs3code.CODE_NOT_FOUND:
log.debug(f'msg="Invoked stat for getxattr on missing file" filepath="{filepath}"')
return None
if statInfo.status.code != cs3code.CODE_OK:
log.error('msg="Failed to stat" filepath="%s" userid="%s" trace="%s" key="%s" reason="%s"' %
(filepath, userid[-20:], statInfo.status.trace, key, statInfo.status.message.replace('"', "'")))
raise IOError(statInfo.status.message)
log.debug(f'msg="Invoked stat for getxattr" filepath="{filepath}" elapsedTimems="{(tend - tstart) * 1000:.1f}"')
ctx['xattrcache'][href] = statInfo.info.arbitrary_metadata.metadata
try:
xattrvalue = ctx['xattrcache'][href][key]
if xattrvalue == '':
raise KeyError
if not statInfo:
log.debug(f'msg="Returning cached attr on getxattr" filepath="{filepath}" key="{key}"')
return xattrvalue
except KeyError:
log.info('msg="Empty value or key not found in getxattr" filepath="%s" key="%s" trace="%s" metadata="%s"' %
(filepath, key, statInfo.status.trace if statInfo else 'N/A', ctx['xattrcache'][href]))
return None


def rmxattr(endpoint, filepath, userid, key, lockmd):
'''Remove the extended attribute <key> using the given userid as access token'''
ref = _getcs3reference(endpoint, filepath)
Expand All @@ -240,11 +197,6 @@ def rmxattr(endpoint, filepath, userid, key, lockmd):
log.error('msg="Failed to rmxattr" filepath="%s" trace="%s" key="%s" reason="%s"' %
(filepath, key, res.status.trace, res.status.message.replace('"', "'")))
raise IOError(res.status.message)
try:
del ctx['xattrcache'][_hashedref(endpoint, filepath)][key]
except KeyError:
# we did not have this file in the cache, ignore
pass
log.debug(f'msg="Invoked rmxattr" result="{res.status}"')


Expand All @@ -253,7 +205,8 @@ def setlock(endpoint, filepath, userid, appname, value):
if ctx['lockasattr'] and ctx['locknotimpl']:
log.debug(f'msg="Using xattrs to execute setlock" filepath="{filepath}" value="{value}"')
try:
currvalue = getxattr(endpoint, filepath, userid, LOCK_ATTR_KEY)
filemd = stat(endpoint, filepath, userid)
currvalue = filemd['xattrs'][LOCK_ATTR_KEY]
log.info('msg="Invoked setlock on an already locked entity" filepath="%s" appname="%s" previouslock="%s"' %
(filepath, appname, currvalue))
raise IOError(common.EXCL_ERROR)
Expand Down Expand Up @@ -287,7 +240,8 @@ def getlock(endpoint, filepath, userid):
if ctx['lockasattr'] and ctx['locknotimpl']:
log.debug(f'msg="Using xattrs to execute getlock" filepath="{filepath}"')
try:
currvalue = getxattr(endpoint, filepath, userid, LOCK_ATTR_KEY)
filemd = stat(endpoint, filepath, userid)
currvalue = filemd['xattrs'][LOCK_ATTR_KEY]
return {
'lock_id': currvalue.split('!')[1],
'type': 2, # LOCK_TYPE_WRITE, though this is advisory!
Expand Down Expand Up @@ -333,7 +287,8 @@ def refreshlock(endpoint, filepath, userid, appname, value, oldvalue=None):
if ctx['lockasattr'] and ctx['locknotimpl']:
log.debug(f'msg="Using xattrs to execute setlock" filepath="{filepath}" value="{value}"')
try:
currvalue = getxattr(endpoint, filepath, userid, LOCK_ATTR_KEY)
filemd = stat(endpoint, filepath, userid)
currvalue = filemd['xattrs'][LOCK_ATTR_KEY]
if currvalue.split('!')[0] == appname and (not oldvalue or currvalue.split('!')[1] == oldvalue):
raise KeyError
log.info('msg="Failed precondition on refreshlock" filepath="%s" appname="%s" previouslock="%s"' %
Expand Down Expand Up @@ -369,7 +324,8 @@ def unlock(endpoint, filepath, userid, appname, value):
if ctx['lockasattr'] and ctx['locknotimpl']:
log.debug(f'msg="Using xattrs to execute unlock" filepath="{filepath}" value="{value}"')
try:
currvalue = getxattr(endpoint, filepath, userid, LOCK_ATTR_KEY)
filemd = stat(endpoint, filepath, userid)
currvalue = filemd['xattrs'][LOCK_ATTR_KEY]
if currvalue.split('!')[0] == appname and currvalue.split('!')[1] == value:
raise KeyError
log.info('msg="Failed precondition on unlock" filepath="%s" appname="%s" previouslock="%s"' %
Expand Down
16 changes: 13 additions & 3 deletions src/core/localiface.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,23 @@ def stat(_endpoint, filepath, _userid):
(statInfo.st_ino, _getfilepath(filepath), (tend - tstart) * 1000))
if S_ISDIR(statInfo.st_mode):
raise IOError('Is a directory')
try:
xattrs = {
k.strip('user.'): os.getxattr(_getfilepath(filepath), k).decode()
for k in os.listxattr(_getfilepath(filepath))
}
except OSError as e:
log.info('msg="Failed to invoke listxattr/getxattr" inode="%d" filepath="%s" exception="%s"' %
statInfo.st_ino, _getfilepath(filepath), e)
xattrs = {}
return {
'inode': common.encodeinode('local', str(statInfo.st_ino)),
'filepath': filepath,
'ownerid': str(statInfo.st_uid) + ':' + str(statInfo.st_gid),
'size': statInfo.st_size,
'mtime': statInfo.st_mtime,
'etag': str(statInfo.st_mtime),
'xattrs': xattrs,
}
except (FileNotFoundError, PermissionError) as e:
raise IOError(e) from e
Expand Down Expand Up @@ -147,8 +157,8 @@ def setxattr(endpoint, filepath, userid, key, value, lockmd):
raise IOError(e) from e


def getxattr(_endpoint, filepath, _userid, key):
'''Get the extended attribute <key> on behalf of the given userid. Do not raise exceptions'''
def _getxattr(filepath, key):
'''Internal only: get the extended attribute <key>, do not raise exceptions'''
try:
return os.getxattr(_getfilepath(filepath), 'user.' + key).decode('UTF-8')
except OSError as e:
Expand Down Expand Up @@ -184,7 +194,7 @@ def setlock(endpoint, filepath, userid, appname, value):

def getlock(endpoint, filepath, _userid):
'''Get the lock metadata as an xattr on behalf of the given userid'''
rawl = getxattr(endpoint, filepath, '0:0', common.LOCKKEY)
rawl = _getxattr(filepath, common.LOCKKEY)
if rawl:
lock = common.retrieverevalock(rawl)
if lock['expiration']['seconds'] > time.time():
Expand Down
11 changes: 5 additions & 6 deletions src/core/wopi.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ def checkFileInfo(fileid, acctok):
fmd['SupportsRename'] = fmd['UserCanRename'] = enablerename and \
acctok['viewmode'] in (utils.ViewMode.READ_WRITE, utils.ViewMode.PREVIEW)
fmd['SupportsUserInfo'] = True
uinfo = st.getxattr(acctok['endpoint'], acctok['filename'], acctok['userid'],
utils.USERINFOKEY + '.' + acctok['wopiuser'].split('!')[0])
uinfo = statInfo['xattrs'].get(utils.USERINFOKEY + '.' + acctok['wopiuser'].split('!')[0])
if uinfo:
fmd['UserInfo'] = uinfo
if srv.config.get('general', 'earlyfeatures', fallback='False').upper() == 'TRUE':
Expand Down Expand Up @@ -194,7 +193,7 @@ def setLock(fileid, reqheaders, acctok):

if retrievedLock or op == 'REFRESH_LOCK':
# useful for later checks
savetime = st.getxattr(acctok['endpoint'], fn, acctok['userid'], utils.LASTSAVETIMEKEY)
savetime = statInfo['xattrs'].get(utils.LASTSAVETIMEKEY)
if savetime and (not savetime.isdigit() or int(savetime) < int(statInfo['mtime'])):
# we had stale information, discard
log.warning('msg="Detected external modification" filename="%s" savetime="%s" mtime="%s" token="%s"' %
Expand Down Expand Up @@ -600,9 +599,9 @@ def putFile(fileid, acctok):
try:
if srv.config.get('general', 'detectexternalmodifications', fallback='True').upper() == 'TRUE':
# check now the destination file against conflicts if required
savetime = st.getxattr(acctok['endpoint'], acctok['filename'], acctok['userid'], utils.LASTSAVETIMEKEY)
mtime = None
mtime = st.stat(acctok['endpoint'], acctok['filename'], acctok['userid'])['mtime']
statInfo = st.statx(acctok['endpoint'], acctok['filename'], acctok['userid'], versioninv=1)
savetime = statInfo['xattrs'].get(utils.LASTSAVETIMEKEY)
mtime = statInfo['mtime']
if not savetime or not savetime.isdigit() or int(savetime) < int(mtime):
# no xattr was there or we got our xattr but mtime is more recent: someone may have updated the file from
# a different source (e.g. FUSE or SMB mount), therefore force conflict and return failure to the application
Expand Down
38 changes: 27 additions & 11 deletions src/core/xrootiface.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,28 +218,41 @@ def statx(endpoint, fileref, userid, versioninv=1):
filepath = filepath.decode().replace(EOSVERSIONPREFIX, '').replace('#and#', '&')
else:
filepath = fileref

# stat with the -m flag, so to obtain a k=v list
statInfo = _xrootcmd(endpoint, 'fileinfo', '', userid, 'mgm.path=' + _getfilepath(filepath, encodeamp=True)
+ '&mgm.pcmd=fileinfo&mgm.file.info.option=-m')
xattrs = {}
try:
# output looks like:
# keylength.file=35 file=/eos/.../filename size=2915 mtime=1599649863.0 ctime=1599649866.280468540
# btime=1599649866.280468540 clock=0 mode=0644 uid=xxxx gid=xxxx fxid=19ab8b68 fid=430672744 ino=115607834422411264
# pid=1713958 pxid=001a2726 xstype=adler xs=a2dfcdf9 etag="115607834422411264:a2dfcdf9" detached=0 layout=replica
# nstripes=2 lid=00100112 nrep=2 xattrn=sys.eos.btime xattrv=1599649866.280468540 uid:xxxx[username] gid:xxxx[group]
# tident:xxx name:username dn: prot:https host:xxxx.cern.ch domain:cern.ch geo: sudo:0 fsid=305 fsid=486
# keylength.file=60 file=/eos/.../file name with spaces size=49322 status=healthy mtime=1722868599.312924544
# ctime=1722868599.371644799 btime=1722868599.312743733 atime=1722868599.312744021 clock=0 mode=0644 uid=55375 gid=2763
# fxid=1fef0b88 fid=535759752 ino=143816913334566912 pid=21195331 pxid=01436a43 xstype=adler xs=7fd2729c
# etag="143816913334566912:7fd2729c" detached=0 layout=replica nstripes=2 lid=00100112 nrep=2
# xattrn=sys.eos.btime xattrv=1722868599.312743733
# xattrn=sys.fs.tracking xattrv=+648+260
# xattrn=sys.fusex.state xattrv=<non-decodable>
# xattrn=sys.utrace xattrv=2012194c-5338-11ef-b295-a4bf0179682d
# xattrn=sys.vtrace xattrv=[Mon Aug 5 16:36:39 2024] uid:55375[xxxx] gid:2763[xx] tident:root.11:2881@...
# xattrn=user.iop.wopi.lastwritetime xattrv=1722868599
# fsid=648 fsid=260
# cf. https://gitlab.cern.ch/dss/eos/-/blob/master/archive/eosarch/utils.py
kvlist = [kv.split(b'=') for kv in statInfo.split()]
# extract the key-value pairs, but drop the xattrn/xattrv ones as not needed and potentially containing
# non-unicode-decodable content (cf. CERNBOX-3514)
# extract the key-value pairs for the core metadata and the user xattrs; drop the rest, don't decode as
# some sys xattrs may contain non-unicode-decodable content (cf. CERNBOX-3514)
statxdata = {k.decode(): v.decode().strip('"') for k, v in
[kv for kv in kvlist if len(kv) == 2 and kv[0].find(b'xattr') == -1]}
for ikv, kv in enumerate(kvlist):
if len(kv) == 2 and kv[0].find(b'xattrn=user.'):
xattrs[kv[1].decode().split('user.')] = kvlist[ikv+1][1].decode()

except ValueError as e:
# UnicodeDecodeError exceptions would fall here
log.error(f'msg="Invoked fileinfo but failed to parse output" result="{statInfo}" exception="{e}"')
raise IOError('Failed to parse fileinfo response') from e
if 'treesize' in statxdata:
raise IOError('Is a directory') # EISDIR

if versioninv == 0:
# statx info of the given file:
# we extract the eosinstance from endpoint, which looks like e.g. root://eosinstance[.cern.ch]
Expand All @@ -253,6 +266,7 @@ def statx(endpoint, fileref, userid, versioninv=1):
'size': int(statxdata['size']),
'mtime': int(float(statxdata['mtime'])),
'etag': statxdata['etag'],
'xattrs': xattrs,
}
# now stat the corresponding version folder to get an inode invariant to save operations, see CERNBOX-1216
# also, use the owner's as opposed to the user's credentials to bypass any restriction (e.g. with single-share files)
Expand All @@ -261,6 +275,7 @@ def statx(endpoint, fileref, userid, versioninv=1):
rcv, infov = _getxrdfor(endpoint).query(QueryCode.OPAQUEFILE, _getfilepath(verFolder) + ownerarg + '&mgm.pcmd=stat',
timeout=timeout)
tend = time.time()

try:
if not infov:
raise IOError(f'xrdquery returned nothing, rcv={rcv}')
Expand All @@ -286,6 +301,7 @@ def statx(endpoint, fileref, userid, versioninv=1):
except (IOError, UnicodeDecodeError) as e:
log.error(f'msg="Failed to mkdir/stat version folder" filepath="{_getfilepath(filepath)}" error="{e}"')
raise IOError(e) from e

# return the metadata of the given file, with the inode taken from the version folder
endpoint = _geturlfor(endpoint)
inode = common.encodeinode(endpoint[7:] if endpoint.find('.') == -1 else endpoint[7:endpoint.find('.')], statxdata['ino'])
Expand All @@ -297,6 +313,7 @@ def statx(endpoint, fileref, userid, versioninv=1):
'size': int(statxdata['size']),
'mtime': int(float(statxdata['mtime'])),
'etag': statxdata['etag'],
'xattrs': xattrs,
}


Expand All @@ -313,9 +330,8 @@ def setxattr(endpoint, filepath, _userid, key, value, lockmd):
+ '&mgm.path=' + _getfilepath(filepath, encodeamp=True), appname)


def getxattr(endpoint, filepath, _userid, key):
'''Get the extended attribute <key> via a special open.
The userid is overridden to make sure it also works on shared files.'''
def _getxattr(endpoint, filepath, key):
'''Internal only: get the extended attribute <key> via a special open.'''
if 'user' not in key and 'sys' not in key:
# if nothing is given, assume it's a user attr
key = 'user.' + key
Expand Down Expand Up @@ -364,7 +380,7 @@ def setlock(endpoint, filepath, userid, appname, value, recurse=False):

def getlock(endpoint, filepath, userid):
'''Get the lock metadata as an xattr'''
rawl = getxattr(endpoint, filepath, userid, common.LOCKKEY)
rawl = _getxattr(endpoint, filepath, common.LOCKKEY)
if rawl:
lock = common.retrieverevalock(rawl)
if lock['expiration']['seconds'] > time.time():
Expand Down
9 changes: 5 additions & 4 deletions test/test_storageiface.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def test_statx_fileid(self):
self.assertTrue('size' in statInfo, 'Missing size from stat output')
self.assertTrue('mtime' in statInfo, 'Missing mtime from stat output')
self.assertTrue('etag' in statInfo, 'Missing etag from stat output')
self.assertTrue('xattrs' in statInfo, 'Missing xattrs from stat output')
self.storage.removefile(self.endpoint, self.homepath + '/test.txt', self.userid)

def test_statx_invariant_fileid(self):
Expand Down Expand Up @@ -372,11 +373,11 @@ def test_xattr(self):
self.storage.setlock(self.endpoint, self.homepath + '/test&xattr.txt', self.userid, 'test app', 'xattrlock')
self.storage.setxattr(self.endpoint, self.homepath + '/test&xattr.txt', self.userid, 'testkey', 123,
('test app', 'xattrlock'))
v = self.storage.getxattr(self.endpoint, self.homepath + '/test&xattr.txt', self.userid, 'testkey')
self.assertEqual(v, '123')
fmd = self.storage.statx(self.endpoint, self.homepath + '/test&xattr.txt', self.userid)
self.assertEqual(fmd['xattrs']['testkey'], '123')
self.storage.rmxattr(self.endpoint, self.homepath + '/test&xattr.txt', self.userid, 'testkey', ('test app', 'xattrlock'))
v = self.storage.getxattr(self.endpoint, self.homepath + '/test&xattr.txt', self.userid, 'testkey')
self.assertEqual(v, None)
fmd = self.storage.statx(self.endpoint, self.homepath + '/test&xattr.txt', self.userid, 'testkey')
self.assertIsNone(fmd['xattrs'].get('testkey'))
self.storage.removefile(self.endpoint, self.homepath + '/test&xattr.txt', self.userid)

def test_rename_statx(self):
Expand Down

0 comments on commit a8217ef

Please sign in to comment.