Source code for d6tpipe.api

import copy
import os
import json
import ntpath
import shutil
import warnings
import logging
# logging.basicConfig(level=logging.DEBUG)
from abc import ABC
from pathlib import Path

import jwt

import d6tcollect

from d6tpipe.http_client import client as python_http_client
from d6tpipe.utils.utils import ClientTiny, loadjson, _dict_sort

[docs]class ConfigManager(object): """ Manage local config. The config is stored in JSON and can be edited directly `filecfg` location, by default '~/d6tpipe/cfg.json' Args: profile (str): name of profile to use filecfg (str): path to where config file is stored """ def __init__(self, profile=None, filecfg='~/d6tpipe/cfg.json'): self.profile = 'default' if profile is None else profile if str(filecfg).startswith('~'): filecfg = os.path.expanduser(filecfg) self.filecfg = filecfg
[docs] def init(self, config=None, server='https://pipe.databolt.tech', reset=False): """ Initialize config with content Args: config (dict): manually pass config object server (str): location of REST API server reset (bool): force reset of an existing config """ if os.path.exists(self.filecfg) and not reset and self.profile in self._loadall(): # todo: why does Path(self.filecfg).exists() not work in pytest? warnings.warn('Config for profile {} in {} already exists, skipping init. Use reset=True to reset config.'.format(self.profile,self.filecfg)) return None if not config: config = {} if 'server' not in config: config['server'] = server if 'filerepo' not in config: config['filerepo'] = '~/d6tpipe' p = Path(config['filerepo']) p2 = p/'files/{}/'.format(self.profile) config['filereporoot'] = str(p) config['filerepo'] = str(p2) if 'filedb' not in config: config['filedb'] = str(p2/'.filedb.json') # create config file if doesn't exist if not os.path.exists(self.filecfg): if not os.path.exists(ntpath.dirname(self.filecfg)): os.makedirs(ntpath.dirname(self.filecfg)) self._save(config)
[docs] def update(self, config): """ Update config. Only keys present in the new dict will be updated, other parts of the config will be kept as is. In other words you can pass in a partial dict to update just the parts you need to be updated. Args: config (dict): updated config """ configall = self._loadall() config_current = configall[self.profile] config_current.update(config) self._save(config_current) return True
def _save(self, config): if os.path.exists(self.filecfg): configall = self._loadall() configall[self.profile] = config else: configall = {} configall[self.profile] = config with open(self.filecfg, 'w') as f: json.dump(configall, f, indent=4) return True def _loadall(self): if not os.path.exists(self.filecfg): self.init() print('auto created profile "{}", see docs how to customize profile'.format(self.profile)) with open(self.filecfg, 'r') as f: config = json.load(f) return config
[docs] def load(self): """ Loads config Returns: dict: config """ config = self._loadall() if self.profile not in config: self.init() config = self._loadall() warnings.warn('auto created profile "{}", see docs how to customize profile'.format(self.profile)) config = config[self.profile] for ikey in ['filereporoot','filerepo','filedb']: if config[ikey].startswith('~'): # do this dynamically config[ikey] = os.path.expanduser(config[ikey]) if not os.path.exists(config['filerepo']): os.makedirs(config['filerepo']) return config
class _APIBase(metaclass=d6tcollect.Collect): def __init__(self, config=None, profile=None, filecfg='~/d6tpipe/cfg.json'): self.profile = 'default' if profile is None else profile if config is None: self.configmgr = ConfigManager(filecfg=filecfg, profile=self.profile) self.config = self.configmgr.load() else: self.config = config warnings.warn("Using manual config override, some api functions might not work") self.cfg_profile = self.config self.cfg_filecfg = filecfg self.filerepo = self.cfg_profile['filerepo'] self.dir = self.filerepo self.dirpath = Path(self.dir) self.key = self.cfg_profile.get('key',None) if self.key is None: # print("Auto generated an encryption key, update the config if you want to use your own") import uuid self.key = str(uuid.uuid4()) self.configmgr.update({'key':self.key}) self.encrypt_keys = ['location','readCredentials','writeCredentials','settings','files','readParams'] def list_pipes(self, names_only=True, parent_only=False): """ List all pipes you have access to Args: names_only (bool): if false, return all details """ r = self.cnxn.pipes.get()[1] if parent_only: r = [o for o in r if not o.get('parent')] if names_only: r = sorted([o['name'] for o in r]) return r def wipe_all(self, confirm=True): """ Remove all d6tpipe files. WARNING: this can't be undone Args: confirm (bool): ask for user confirmation """ if confirm: c = input('Confirm deleting files in {}. WARNING: this cannot be undone (y/n)'.format(self.dir)) else: c = 'y' if c=='y': del self.cnxn shutil.rmtree(self.filerepo) os.remove(self.configmgr.filecfg) def list_local_pipes(self): """ List all pipes already pulled Returns: list: list of pipe names """ dirs = [ name for name in os.listdir(self.filerepo) if os.path.isdir(os.path.join(self.filerepo, name)) ] return [name for name in dirs if os.listdir(os.path.join(self.filerepo, name))] def move_repo(self, path): """ Moves all files to another location and updates the config Args: path (pathlib.Path): Returns: bool: """ Path(path).mkdir(parents=True, exist_ok=True) shutil.move(self.filerepo,path) self.configmgr.update({'filerepo': path}) print('Moved repo to {}. Reloading api'.format(path)) self.__init__(profile=self.profile, filecfg=self.cfg_filecfg) return True def encode(self, dict_): """ Encrypt Args: dict_ (dict): Returns: dict: all values encrypted, keys are kept as is """ raise NotImplementedError('Sign up for premium features to access this function, email support@databolt.tech') def decode(self, dict_): """ Decrypt Args: dict_ (dict): Returns: dict: """ raise NotImplementedError('Sign up for premium features to access this function, email support@databolt.tech')
[docs]class APILocal(_APIBase,metaclass=d6tcollect.Collect): """ As an alternative to the remote API, you can store everything locally. It mirrors the basic funtionality of the remote API but is not as feature rich. Args: config (dict): manually pass config object profile (str): name of profile to use filecfg (str): path to where config file is stored """ def __init__(self, config=None, profile=None, filecfg='~/d6tpipe/cfg.json'): super().__init__(config,profile,filecfg) self.cnxn = ClientTiny(self.config['filedb']) self.mode = 'local'
[docs]class APIClient(_APIBase, metaclass=d6tcollect.Collect): """ Manager to interface with the remote API. Args: token (str): your API token config (dict): manually pass config object profile (str): name of profile to use filecfg (str): path to where config file is stored """ def __init__(self, token='config', config=None, profile=None, filecfg='~/d6tpipe/cfg.json'): super().__init__(config,profile,filecfg) if token=='config': self.token = self.cfg_profile.get('token',None) else: self.token = token if self.token is not None: request_headers = { "Authorization": 'Token {0}'.format(self.token) } else: # if not registered request_headers = {} client = python_http_client.Client(host=self.cfg_profile['server'], request_headers=request_headers, append_slash=True, version='1') self.cnxn = client.api self.mode = self.cfg_profile.get('mode','local') if self.token is not None: # test connection try: r,d = self.cnxn.get() if 'databolt.tech' in self.cfg_profile['server'] and 'username' not in d: warnings.warn('API authentication error') else: self.username = d.get('username') print('Connected to {} as {}'.format(self.cfg_profile['server'],self.username)) if 'message' in d: print(d['message']) except Exception as e: warnings.warn('API connection error ' + str(e)) else: print('No token provided. Register or login to connect to repo API.')
[docs] def register(self, username, email, password): """ Register a new API user Args: username (str): email (str): password (str): """ data = {'username': username, 'email': email, 'password': password} # response, data = self.cnxn.accounts.post(request_body=data) try: response, data = self.cnxn.accounts.post(request_body=data) except Exception as e: if 'This field must be unique' in str(e): warnings.warn("Username or email already registered, registration failed. Pick a different username if you haven't registered before. If you already registered and forgot your token, call api.forgotToken(). If you want to re-register provide a different username") return else: raise e token = data.get('token') self._printtoken(token) return token
[docs] def login(self, username, password): """ Login if already registered Args: username (str): password (str): """ return self.forgotToken(username, password)
[docs] def forgotToken(self, username, password): """ Retrieve your API token Args: username (str): password (str): """ data = {'username': username, 'password': password} response, data = self.cnxn.accounts.token.post(request_body=data) token = data.get('token') self._printtoken(token) return token
[docs] def setToken(self, token): self._printtoken(token)
def _printtoken(self, token): print('Your token is below. Please save it and KEEP IT SAFE! If you forget it, you can retrieve it with APIClient().forgotToken(username, password)') print(token) print('reloading api to update token') self.configmgr.update({'token': token}) self.__init__(profile=self.profile, filecfg=self.cfg_filecfg) def _unregister(self, username): self.cnxn.accounts._(username).delete() self.configmgr.update({'token': None})
[docs]@d6tcollect.collect def list_profiles(filecfg='~/d6tpipe/cfg.json'): if str(filecfg).startswith('~'): filecfg = os.path.expanduser(filecfg) print(open(filecfg).read())
[docs]@d6tcollect.collect def upsert_resource(apiroot, settings): """ Convenience function to create or update a resource Args: apiroot (obj): API endpoint root eg `api.cnxn.pipes` settings (dict): resource settings Returns: response (obj): server response data (dict): json returned by server """ try: r, d = apiroot._(settings['name']).patch(request_body=settings) except Exception as e: if 'Not found' in str(e): return apiroot.post(request_body=settings) else: raise e return apiroot._(settings['name']).get()
[docs]@d6tcollect.collect def upsert_pipe(api, settings): """ Convenience function to create or update a pipe Args: api (obj): api settings (dict): pipe settings Returns: response (obj): server response data (dict): json returned by server """ return upsert_resource(api.cnxn.pipes, settings)
[docs]@d6tcollect.collect def upsert_permissions(api, pipe_name, settings): """ Convenience function to create or update pipe permissions Args: api (obj): api settings (dict): permission settings Returns: response (obj): server response data (dict): json returned by server """ apiroot = api.cnxn.pipes._(pipe_name).permissions # for now just post to permissions return apiroot.post(request_body=settings)
[docs]def upsert_pipe_json(api, path_json, name): """ Convenience function to create or update a resource. Loads settings from config file to secure credentials Args: api (obj): api path_json (str): path to config file in json format name (str): name of json entry Returns: response (obj): server response data (dict): json returned by server """ settings = loadjson(path_json)[name] return upsert_pipe(api, settings)