import os, shutil, datetime, time, fnmatch, re, copy, itertools
from pathlib import Path, PurePosixPath
import warnings, logging
# logger = logging.getLogger(__name__)
# logger.setLevel(logging.DEBUG)
from datetime import datetime
from tinydb import TinyDB, Query
from tinydb_serialization import SerializationMiddleware
import cachetools
from tqdm import tqdm
from d6tpipe.api import ConfigManager
from d6tpipe.tinydb_serializers import DateTimeSerializer
from d6tpipe.utils import filemd5, copytree
from d6tpipe.exceptions import *
import d6tcollect
#**********************************
# helpers
#**********************************
_cfg_mode_valid = ['default', 'new', 'mod', 'all']
_tdbserialization = SerializationMiddleware()
_tdbserialization.register_serializer(DateTimeSerializer(), 'TinyDate')
def _tinydb_last(db, getkey=None, sortkey='updated_at'):
if db is None or len(db) == 0: return []
r = sorted(db.all(), key=lambda k: k[sortkey])[-1]
return r if getkey is None else r.get(getkey)
def _tinydb_insert(db, filessync, filesremote, fileslocal):
if db is not None:
db.insert({'updated_at': datetime.now(), 'action': 'push', 'sync':filessync, 'remote':filesremote, 'local': fileslocal})
def _files_new(filesfrom, filesto):
filesfrom = [f['filename'] for f in filesfrom]
filesto = [f['filename'] for f in filesto]
return list(set(filesfrom)-set(filesto))
def _filenames(list_):
return [d['filename'] for d in list_]
def _files_mod(filesfrom, filesto, key='crc'):
def _tinydb_to_filedate_dict(query):
return {k: v for (k, v) in [(d['filename'], d[key]) for d in query]}
filesto = _tinydb_to_filedate_dict(filesto)
filesfrom = _tinydb_to_filedate_dict(filesfrom)
if filesto and filesfrom:
return [k for k, v in filesfrom.items() if v != filesto.get(k,v)]
else:
return []
def _files_sort(files, sortby='filename'):
return sorted(files, key=lambda k: k[sortby])
def _files_diff(filesfrom, filesto, mode, include, exclude, nrecent=0):
if nrecent !=0:
filesfrom_bydate = _files_sort(filesfrom, 'modified_at')
filesfrom = filesfrom_bydate[-nrecent:] if nrecent>0 else filesfrom_bydate[:nrecent]
if filesto:
if mode == 'new':
filesdiff = _files_new(filesfrom, filesto)
elif mode == 'modified':
filesdiff = _files_mod(filesfrom, filesto)
elif mode == 'default':
filesdiff = _files_new(filesfrom, filesto) + _files_mod(filesfrom, filesto)
elif mode == 'all':
filesdiff = _filenames(filesfrom)
else:
raise ValueError('Invalid pipe mode')
else:
filesdiff = _filenames(filesfrom)
filesdiff = _apply_fname_filter(filesdiff, include, exclude)
return filesdiff
def _apply_fname_filter(fnames, include, exclude):
# todo: multi filter with *.csv|*.xls*|*.txt, split on |
def helper(list_,filter_):
return list(itertools.chain.from_iterable(fnmatch.filter(list_, ifilter) for ifilter in filter_.split('|')))
if include:
fnames = helper(fnames, include)
if exclude:
fnamesex = helper(fnames, exclude)
fnames = list(set(fnames) - set(fnamesex))
return fnames
#************************************************
# Pipe
#************************************************
[docs]class PipeBase(object, metaclass=d6tcollect.Collect):
"""
Abstract class, use the functions but dont instantiate the class
"""
def __init__(self, name, sortby='filename'):
if not re.match(r'^[a-zA-Z0-9-]+$', name):
raise ValueError('Invalid pipe name, needs to be alphanumeric [a-zA-Z0-9-]')
self.name = name
self.sortby = sortby
def _set_dir(self, dir):
self.filerepo = self.cfg_profile['filerepo']
self.dirpath = Path(self.filerepo)/dir
self.dirpath.mkdir(parents=True, exist_ok=True) # create dir if doesn't exist
self.dir = str(self.dirpath) + os.sep
def _getfilter(self, include=None, exclude=None):
return include, exclude
def _files_sort(self, files, sortby=None):
sortby = self.sortby if sortby is None else sortby
sortby = 'filename' if sortby=='name' else sortby
sortby = 'modified_at' if sortby=='mod' else sortby
return _files_sort(files, sortby)
[docs] def scan_local(self, include=None, exclude=None, attributes=False, sortby=None, files=None, fromdb=False, on_not_exist='warn'):
"""
Get file attributes from local. To run before doing a pull/push
Args:
include (str): pattern of files to include, eg `*.csv` or `a-*.csv|b-*.csv`
exclude (str): pattern of files to exclude
attributes (bool): return filenames with attributes
sortby (str): sort files this key. `name`, `mod`, `size`
files (list): override list of filenames
fromdb (bool): use files from local db, if false scans all files in pipe folder
on_not_exist (bool): how to handle missing files when creating file attributes
Returns:
list: filenames with attributes
"""
if files is None:
if fromdb:
files = _tinydb_last(self.dbfiles,'local')
files = _filenames(files)
else:
files = [str(PurePosixPath(p.relative_to(self.dir))) for p in self.dirpath.glob('**/*') if not p.is_dir()]
include, exclude = self._getfilter(include, exclude)
files = _apply_fname_filter(files, include, exclude)
def getattrib(fname):
p = Path(self.dirpath)/fname
if not p.exists():
if on_not_exist=='warn':
warnings.warn('Local file {} does not exist'.format(fname))
return None
dtmod = datetime.fromtimestamp(p.stat().st_mtime)
crc = filemd5(p)
return {'filename':fname, 'modified_at': dtmod, 'size': p.stat().st_size, 'crc': crc}
filesall = [getattrib(fname) for fname in files]
filesall = [o for o in filesall if o is not None]
filesall = self._files_sort(filesall, sortby)
if attributes:
return filesall, files
else:
return _filenames(filesall)
[docs] def files(self, include=None, exclude=None, sortby=None, check_exists=True, fromdb=True):
"""
Files synced in local repo
Args:
include (str): pattern of files to include, eg `*.csv` or `a-*.csv|b-*.csv`
exclude (str): pattern of files to exclude
sortby (str): sort files this key. `name`, `mod`, `size`
check_exists (bool): check files exist locally
Returns:
list: filenames
"""
return self.filepaths(aspath=False, include=include, exclude=exclude, sortby=sortby, check_exists=check_exists, fromdb=fromdb)
[docs] def filepaths(self, include=None, exclude=None, sortby=None, check_exists=True, aspathlib=True, aspath=True, fromdb=True):
"""
Full path to Files synced in local repo
Args:
include (str): pattern of files to include, eg `*.csv` or `a-*.csv|b-*.csv`
exclude (str): pattern of files to exclude
sortby (str): sort files this key. `name`, `mod`, `size`
check_exists (bool): check files exist locally
aspathlib (bool): return as pathlib object
Returns:
path: path to file, either `Pathlib` or `str`
"""
files = _tinydb_last(self.dbfiles,'local') if fromdb else self.scan_local(attributes=True)[0]
files = self._files_sort(files, sortby)
fnames = _apply_fname_filter(_filenames(files), include, exclude)
if check_exists:
[FileNotFoundError(fname) for fname in fnames if not (self.dirpath/fname).exists()]
if aspath:
if not aspathlib:
return [str(self.dirpath/f) for f in fnames]
else:
return [self.dirpath / f for f in fnames]
else:
return fnames
[docs] def import_files(self, files, subdir=None, move=False):
"""
Import files to repo
Args:
files (list): list of files, eg from `glob.iglob('folder/**/*.csv')`
subdir (str): sub directory to import into
move (bool): move or copy
"""
dstdir = self.dirpath/subdir if subdir else self.dirpath
dstdir.mkdir(parents=True, exist_ok=True)
if move:
[shutil.move(ifile,dstdir/Path(ifile).name) for ifile in files]
else:
[shutil.copy(ifile,dstdir/Path(ifile).name) for ifile in files]
[docs] def import_file(self, file, subdir=None, move=False):
"""
Import a single file to repo
Args:
files (str): path to file
subdir (str): sub directory to import into
move (bool): move or copy
"""
self.import_files([file], subdir, move)
[docs] def import_dir(self, dir, move=False):
"""
Import a directory including subdirs
Args:
dir (str): directory
move (bool): move or copy
"""
copytree(dir,self.dir, move)
[docs] def delete_files(self, files=None, confirm=True, all_local=None):
"""
Delete files, local and remote
Args:
files (list): filenames, if empty delete all
confirm (bool): ask user to confirm delete
all_local (bool): delete all files or just synced files? (local only)
"""
self.delete_files_remote(files=files, confirm=confirm)
self.delete_files_local(files=files, confirm=confirm, delete_all=all_local)
[docs] def delete_files_remote(self, files=None, confirm=True):
"""
Delete all remote files
Args:
files (list): filenames, if empty delete all
confirm (bool): ask user to confirm delete
"""
if not files:
files='all'
if confirm:
c = input('Confirm deleting {} files in {} (y/n)'.format(files, self.remote_prefix))
if c=='n': return None
else:
c = 'y'
if c=='y':
if files=='all':
files = self.scan_remote(cached=False)
self._pullpush_luigi(files, op='remove')
[docs] def delete_files_local(self, files=None, confirm=True, delete_all=None, ignore_errors=False):
"""
Delete all local files and reset file database
Args:
files (list): filenames, if empty delete all
confirm (bool): ask user to confirm delete
delete_all (bool): delete all files or just synced files?
ignore_errors (bool): ignore missing file errors
"""
if not files:
files='all'
if confirm:
c = input('Confirm deleting {} files in {} (y/n)'.format(files, self.dir))
if c=='n': return None
else:
c = 'y'
if files=='all' and delete_all is None:
d = input('Delete all files or just downloaded files (a/d)')
delete_all = True if d=='a' else False
if c=='y':
if delete_all:
shutil.rmtree(self.dir, ignore_errors=ignore_errors)
else:
[f.unlink() for f in self.filepaths()]
self.dbfiles.purge()
[docs]class PipeLocal(PipeBase, metaclass=d6tcollect.Collect):
"""
Managed data pipe, LOCAL mode for accessing local files ONLY
Args:
api (obj): API manager object
name (str): name of the data pipe
profile (str): name of profile to use
filecfg (str): path to where config file is stored
sortby (str): sort files this key. `name`, `mod`, `size`
"""
def __init__(self, name, config=None, profile=None, filecfg='~/d6tpipe/cfg.json', sortby='filename'):
super().__init__(name, sortby)
self.profile = 'default' if profile is None else profile
if config is None:
self.configmgr = ConfigManager(filecfg=filecfg, profile=self.profile)
self.config = self.configmgr.load()
else:
self.config = config
warnings.warn("Using manual config override, some api functions might not work")
self.cfg_profile = self.config
self._set_dir(self.name)
# create db connection
self._db = TinyDB(self.cfg_profile['filedb'], storage=_tdbserialization)
self.dbfiles = self._db.table(name+'-files')
self.dbconfig = self._db.table(name+'-cfg')
self.settings = self.dbconfig.all()[-1]['pipe'] if self.dbconfig.all() else {}
self.schema = self.settings.get('schema',{})
print('Operating in local mode, use this to access local files, to run remote operations use `Pipe()`')
[docs]class Pipe(PipeBase, metaclass=d6tcollect.Collect):
"""
Managed data pipe
Args:
api (obj): API manager object
name (str): name of the data pipe. Has to be created first
mode (str): sync mode
sortby (str): sort files this key. `name`, `mod`, `size`
credentials (dict): override credentials
Note:
* mode: controls which files are synced
* 'default': modified and new files
* 'new': new files only
* 'mod': modified files only
* 'all': all files
"""
def __init__(self, api, name, mode='default', sortby='filename', credentials=None):
# set params
super().__init__(name, sortby)
self.api = api
self.api_islocal = api.__class__.__name__== 'APILocal'
if not mode in _cfg_mode_valid:
raise ValueError('Invalid mode, needs to be {}'.format(_cfg_mode_valid))
self.mode = mode
# get remote details
self.cnxnapi = api.cnxn
self.cnxnpipe = self.cnxnapi.pipes._(name)
self.settings = self.cnxnpipe.get()[1]
if not self.settings:
raise ValueError('pipe not found, make sure it was created')
if self.settings['protocol'] not in ['s3','ftp', 'sftp']:
raise NotImplementedError('Unsupported protocol, only s3 and (s)ftp supported')
self.settings['options'] = self.settings.get('options',{})
self.remote_prefix = self.settings['options']['remotepath']
self.encrypted_pipe = self.settings['options'].get('encrypted',False)
if self.encrypted_pipe:
self.settings = self.api.decode(self.settings)
self.role = self.settings.get('role')
self.cfg_profile = api.cfg_profile
self._set_dir(self.name)
self.credentials_override = credentials
# DDL
self.schema = self.settings.get('schema',{})
# create db connection
self._db = TinyDB(self.cfg_profile['filedb'], storage=_tdbserialization)
self.dbfiles = self._db.table(name+'-files')
self.dbconfig = self._db.table(name+'-cfg')
self._cache_scan = cachetools.TTLCache(maxsize=1, ttl=5*60)
# connect msg
msg = 'Successfully connected to pipe {}. '.format(self.name)
if self.role=='read':
msg += ' Read only access'
print(msg)
self.dbconfig.upsert({'name': self.name, 'pipe': self.settings}, Query().name == self.name)
def _getfilter(self, include, exclude):
if include is None:
include = self.settings['options'].get('include')
if exclude is None:
exclude = self.settings['options'].get('exclude')
return include, exclude
[docs] def setmode(self, mode):
"""
Set sync mode
Args:
mode (str): sync mode
Note:
* mode: controls which files are synced
* 'default': modified and new files
* 'new': new files only
* 'mod': modified files only
* 'all': all files
"""
assert mode in _cfg_mode_valid
self.mode = mode
[docs] def update_settings(self, settings):
"""
Update settings. Only keys present in the new dict will be updated, other parts of the config will be kept as is. In other words you can pass in a partial dict to update just the parts you need to be updated.
Args:
config (dict): updated config
"""
self.settings.update(settings)
response, data = self.cnxnpipe.patch(self.settings)
return response, data
[docs] def scan_remote(self, attributes=False, cached=True):
"""
Get file attributes from remote. To run before doing a pull/push
Args:
cached (bool): use cached results
Returns:
list: filenames with attributes in remote
"""
c = self._cache_scan.get(0)
if cached and c is not None:
response, data = c
else:
filesall, filenames = self._list_luigi()
response, data = (),filesall # REST API style, for future to return from API
self._cache_scan[0] = (response, data)
if attributes:
return response, data
else:
return _filenames(data)
[docs] def is_synced(self, israise=False):
"""
Check if local is in sync with remote
Args:
israise (bool): raise an exception
Returns:
bool: pipe is updated
"""
filespull = self.pull(dryrun=True)
if filespull:
if israise:
raise PushError(['Remote has changes not pulled to local repo, run pull first', filespull])
else:
return False
return True
[docs] def pull_preview(self, files=None, include=None, exclude=None, nrecent=0, cached=True):
"""
Preview of files to be pulled
Args:
files (list): override list of filenames
include (str): pattern of files to include, eg `*.csv` or `a-*.csv|b-*.csv`
exclude (str): pattern of files to exclude
nrecent (int): use n newest files by mod date. 0 uses all files. Negative number uses n old files
cached (bool): if True, use cached remote information, default 5mins. If False forces remote scan
Returns:
list: filenames with attributes
"""
return self.pull(files=files, dryrun=True, include=include, exclude=exclude, nrecent=nrecent, cached=cached)
[docs] def pull(self, files=None, dryrun=False, include=None, exclude=None, nrecent=0, merge_mode=None, cached=True):
"""
Pull remote files to local
Args:
files (list): override list of filenames
dryrun (bool): preview only
include (str): pattern of files to include, eg `*.csv` or `a-*.csv|b-*.csv`
exclude (str): pattern of files to exclude
nrecent (int): use n newest files by mod date. 0 uses all files. Negative number uses n old files
merge_mode (str): how to deal with pull conflicts ie files that changed both locally and remotely? 'keep' local files or 'overwrite' local files
cached (bool): if True, use cached remote information, default 5mins. If False forces remote scan
Returns:
list: filenames with attributes
"""
if not cached:
self._cache_scan.clear()
filesremote = self.scan_remote(attributes=True)[1]
if files is not None:
filespull = files
else:
logging.debug(['remote files', filesremote])
fileslocal = _files_sort(_tinydb_last(self.dbfiles,'remote'), self.sortby)
filespull = _files_diff(filesremote, fileslocal, self.mode, include, exclude, nrecent)
filespull_size = sum(f['size'] for f in filesremote if f['filename'] in filespull)
print('pulling: {:.2f}MB'.format(filespull_size / 2 ** 20))
if dryrun:
return filespull
# check if any local files have changed
fileslocal = _tinydb_last(self.dbfiles,'local')
fileslocalmod, _ = self.scan_local(files=[s for s in filespull if s in fileslocal], attributes=True)
filesmod = _files_mod(fileslocal, fileslocalmod)
if filesmod:
if merge_mode is None:
raise PullError('Modified files will be overwritten by pull, specify merge mode '+str(filesmod))
elif merge_mode == 'keep':
filespull = list(set(filespull)-set(filesmod))
elif merge_mode == 'overwrite':
pass
else:
raise ValueError('invalid merge mode')
filessync = self._pullpush_luigi(filespull, 'get')
# scan local files after pull
fileslocal, _ = self.scan_local(files=filessync, attributes=True)
# update db
_tinydb_insert(self.dbfiles, filessync, filesremote, fileslocal)
self.dbconfig.upsert({'name': self.name, 'remote': self.settings, 'pipe': self.settings}, Query().name == self.name)
# print README.md
if 'README.md' in filessync:
print('############### README ###############')
with open(self.dirpath/'README.md', 'r') as fhandle:
print(fhandle.read())
print('############### README ###############')
if 'LICENSE.md' in filessync:
print('############### LICENSE ###############')
with open(self.dirpath/'LICENSE.md', 'r') as fhandle:
print(fhandle.read())
print('############### LICENSE ###############')
return filessync
[docs] def push_preview(self, files=None, include=None, exclude=None, nrecent=0, cached=True):
"""
Preview of files to be pushed
Args:
files (list): override list of filenames
include (str): pattern of files to include, eg `*.csv` or `a-*.csv|b-*.csv`
exclude (str): pattern of files to exclude
nrecent (int): use n newest files by mod date. 0 uses all files. Negative number uses n old files
cached (bool): if True, use cached remote information, default 5mins. If False forces remote scan
Returns:
list: filenames with attributes
"""
return self.push(files=files, dryrun=True, include=include, exclude=exclude, nrecent=nrecent, cached=cached)
[docs] def push(self, files=None, dryrun=False, fromdb=False, include=None, exclude=None, nrecent=0, cached=True):
"""
Push local files to remote
Args:
files (list): override list of filenames
dryrun (bool): preview only
fromdb (bool): use files from local db, if false scans all files in pipe folder
include (str): pattern of files to include, eg `*.csv` or `a-*.csv|b-*.csv`
exclude (str): pattern of files to exclude
nrecent (int): use n newest files by mod date. 0 uses all files. Negative number uses n old files
cached (bool): if True, use cached remote information, default 5mins. If False forces remote scan
Returns:
list: filenames with attributes
"""
self._has_write()
if not cached:
self._cache_scan.clear()
if files is not None:
filespush = files
fileslocal, _ = self.scan_local(fromdb=True, attributes=True)
else:
filesremote = _tinydb_last(self.dbfiles, 'local')
fileslocal, _ = self.scan_local(fromdb=fromdb, attributes=True)
if self.mode !='all': self.is_synced(israise=True)
filespush = _files_diff(fileslocal, filesremote, self.mode, include, exclude, nrecent)
filespush_size = sum(f['size'] for f in fileslocal if f['filename'] in filespush)
if dryrun:
print('pushing: {:.2f}MB'.format(filespush_size/2**20))
return filespush
filessync = self._pullpush_luigi(filespush, 'put')
# get files on remote after push
filesremote = self.scan_remote(cached=False, attributes=True)[1]
_tinydb_insert(self.dbfiles, filessync, filesremote, fileslocal)
return filessync
[docs] def reset(self, delete=False):
"""
Resets by deleting all files and pulling
"""
if delete:
self.delete_files_local()
self.setmode('all')
self.pull(cached=False)
self.setmode('default')
[docs] def remove_orphans(self, files=None, direction='local', dryrun=None):
"""
Remove file orphans locally and/or remotely. When you remove files, they don't get synced because pull/push only looks at new or modified files. Use this to clean up any removed files.
Args:
direction (str): where to remove files
dryrun (bool): preview only
Note:
* direction:
* 'local': remove files locally, ie files that exist on local but not in remote
* 'remote': remove files remotely, ie files that exist on remote but not in local
* 'both': combine local and remote
"""
assert direction in ['both','local','remote']
if dryrun is None:
warnings.warn('dryrun active by default, to execute explicitly pass dryrun=False')
dryrun = True
if files is None:
fileslocal = self.scan_local(attributes=True)[0]
filesremote = self.scan_remote(attributes=True)[1]
filesrmlocal = []
filesrmremote = []
if direction in ['local','both']:
filesrmlocal = _files_new(fileslocal, filesremote)
if direction in ['remote','both']:
self._has_write()
filesrmremote = _files_new(filesremote, fileslocal)
else:
filesrmlocal = files; filesrmremote = files;
if dryrun:
return {'local': filesrmlocal, 'remote': filesrmremote}
for fname in filesrmlocal:
try:
(self.dirpath/fname).unlink()
except:
warnings.warn('Unable to delete file {}'.format(fname))
filesrmremote = self._pullpush_luigi(filesrmremote, 'remove')
return {'local': filesrmlocal, 'remote': filesrmremote}
def _list_luigi(self):
def scan_s3():
cnxn = self._connect()
idxStart = len(self.remote_prefix)
filesall = list(cnxn.listdir(self.remote_prefix,return_key=True))
def s3path(o):
return 's3://'+o.bucket_name+'/'+o.key
filesall = [{'filename':s3path(o)[idxStart:], 'modified_at': str(o.last_modified), 'size':o.size, 'crc': o.e_tag.strip('\"')} for o in filesall if o.key[-1]!='/']
return filesall
def scan_ftp():
try:
from ftpsync.ftp_target import FtpTarget
except:
raise ModuleNotFoundError('pyftpsync not found. Run `pip install pyftpsync`')
ftp_dir = self.remote_prefix
idxStart = len(ftp_dir)
cnxn = self._connect()
try:
if ftp_dir!='/':
cnxn.exists(ftp_dir)
except Exception as e:
if 'No such file or directory' in str(e):
cnxn._ftp_mkdirs(ftp_dir)
return [] # created dir so will be empty
credentials = self._get_credentials()
remote = FtpTarget(ftp_dir, self.settings['location'], username=credentials['username'], password=credentials['password'])
remote.open()
try:
filesftp = list(remote.walk())
filesall = [{'filename':str(PurePosixPath(o.rel_path)/o.name)[idxStart:], 'modified_at': datetime.fromtimestamp(o.mtime), 'size':o.size, 'crc': "{}-{}".format(str(o.mtime),str(o.size)) } for o in filesftp if not o.is_dir()]
finally:
remote.close()
return filesall
def scan_sftp():
cnxn = self._connect()
if cnxn.exists(self.remote_prefix):
filesall = cnxn.listdir_attr(self.remote_prefix)
filesall = [{'filename':o.relpath, 'modified_at': datetime.fromtimestamp(o.st_mtime), 'size':o.st_size, 'crc': "{}-{}".format(str(o.st_mtime),str(o.st_size)) } for o in filesall]
else:
filesall = []
# cnxn.close_del()
return filesall
if self.settings['protocol'] == 's3':
filesall = scan_s3()
elif self.settings['protocol'] == 'ftp':
filesall = scan_ftp()
elif self.settings['protocol'] == 'sftp':
filesall = scan_sftp()
else:
raise NotImplementedError('only s3, ftp, sftp supported')
include, exclude = self._getfilter(None,None)
filenames = _filenames(filesall)
filenames = _apply_fname_filter(filenames, include, exclude)
filesall = [d for d in filesall if d['filename'] in filenames]
filesall = sorted(filesall, key = lambda x: x['filename'])
filenames = sorted(filenames)
return filesall, filenames
def _pullpush_luigi(self, files, op, cnxn=None):
if cnxn is None:
cnxn = self._connect(op in ['put','remove'])
filessync = []
pbar = ""
for fname in tqdm(files):
pbar = pbar + fname
fnameremote = self.remote_prefix+fname
fnamelocalpath = self.dirpath/fname
fnamelocal = str(PurePosixPath(fnamelocalpath))
if op=='put':
cnxn.put(fnamelocal, fnameremote)
elif op=='get':
fnamelocalpath.parent.mkdir(parents=True, exist_ok=True)
cnxn.get(fnameremote, fnamelocal)
elif op=='remove':
try:
cnxn.remove(fnameremote)
except:
warnings.warn('Unable to delete remote file {}'.format(fnameremote))
elif op=='exists':
fname = cnxn.exists(fnameremote)
else:
raise ValueError('invalid luigi operation')
logging.info('synced files {}'.format(fname))
filessync.append(fname)
self._disconnect(cnxn)
return filessync
def _has_write(self):
if self.role=='read':
raise ValueError('Read-only role, cannot write')
def _get_credentials(self, write=False):
action = 'write' if write else 'read'
# check role
if write: self._has_write()
# get credentials from api
if self.api_islocal:
credentials = self.cnxnpipe.get()[1]['credentials']
if 'read' in credentials:
credentials = credentials[action]
if 'key' in credentials:
credentials['aws_access_key_id'] = credentials.pop('key')
if 'secret' in credentials:
credentials['aws_secret_access_key'] = credentials.pop('secret')
else:
credentials = self.cnxnpipe.credentials.get(query_params={'role':action})[1]
if not credentials:
raise ValueError('No {} credentials provided, make sure pipe has credentials. '.format('write' if write else 'read'))
return credentials
def _reset_credentials(self):
self.cnxnpipe.credentials.get(query_params={'role':'read','renew':True})
def _connect(self, write=False):
credentials = self._get_credentials(write)
if self.settings['protocol'] == 's3':
from luigi.contrib.s3 import S3Client
from d6tpipe.luigi.s3 import S3Client as S3ClientToken
if write:
if 'aws_session_token' in credentials:
cnxn = S3ClientToken(**credentials)
else:
cnxn = S3Client(**credentials)
else:
if 'aws_session_token' in credentials:
cnxn = S3ClientToken(**credentials)
else:
cnxn = S3Client(**credentials)
elif self.settings['protocol'] == 'ftp':
from d6tpipe.luigi.ftp import RemoteFileSystem
cnxn = RemoteFileSystem(self.settings['location'], credentials['username'], credentials['password'])
elif self.settings['protocol'] == 'sftp':
from d6tpipe.luigi.ftp import RemoteFileSystem
try:
import pysftp
except ImportError:
raise ModuleNotFoundError('Please install pysftp to use SFTP.')
cnopts = pysftp.CnOpts()
cnopts.hostkeys = None
cnxn = RemoteFileSystem(self.settings['location'], credentials['username'], credentials['password'], sftp=True, pysftp_conn_kwargs={'cnopts':cnopts})
else:
raise NotImplementedError('only s3 and ftp supported')
return cnxn
def _disconnect(self, cnxn):
if self.settings['protocol'] == 'ftp':
cnxn.close_del()