"""
This module contains all models for datarade.
"""
from typing import List
from urllib.parse import quote_plus
from bcp import BCP, Connection
from sqlalchemy import MetaData, create_engine, schema, types, Table
from datarade import git_client
[docs]class DatasetCatalogNotSupportedException(Exception):
"""Occurs when an invalid platform is supplied to a DatasetCatalog instance."""
print('Supported platforms include: github, azure-devops')
[docs]class DriverNotSupportedException(Exception):
"""Occurs when an invalid driver is supplied to a Database instance."""
print('Supported drivers include: mssql')
[docs]class Field:
"""
Represents a column in a dataset
Args:
name: name of the field
type: field type, one of: [Boolean, Date, DateTime, Time, Float, Integer, Numeric, String, Text]
description: non-functional, short description of the field, can include notes about
what the field is or how it's populated
"""
def __init__(self, name: str, type: str, description: str = None):
self.name = name
self.type = type
self.description = description
@property
def sqlalchemy_column(self) -> 'schema.Column':
"""
Converts a datarade Field object into a sqlalchemy Column object
Returns: a sqlalchemy Column object
"""
type_lookup = {
'Boolean': types.Boolean,
'Date': types.Date,
'DateTime': types.DateTime,
'Time': types.Time,
'Float': types.Float,
'Integer': types.Integer,
'Numeric': types.Numeric(18, 2),
'String': types.String,
'Text': types.Text,
}
try:
return schema.Column(self.name, type_lookup[self.type], comment=self.description)
except KeyError as e:
print(f'Not a valid column type: {self.type}')
raise e
[docs]class Database:
"""
Represents a database, either as a source for a Dataset, or as a target in a DatasetContainer
Args:
driver: the type of database, currently only 'mssql' is supported
database_name: the name of the database
host: the name of the server, including the instance
port: the port that the database is listening to on the server
schema_name: the name of the schema
"""
def __init__(self, driver: str, database_name: str, host: str, port: int = None, schema_name: str = None):
self.driver = driver
self.database_name = database_name
self.host = host
self.port = port
self.schema_name = schema_name
[docs] def bcp(self, username: str = None, password: str = None) -> 'BCP':
"""
Takes credentials and returns a BCP object for this database
Args:
username: the username for the database
password: the password for the database
Returns: a BCP object
"""
conn = Connection(driver=self.driver,
host=self.host,
port=self.port,
username=username,
password=password)
return BCP(conn)
[docs] def full_table_name(self, table_name: str) -> str:
"""
A utility method that is needed for MS SQL Server databases which have schemas
Args:
table_name: the one part name of the table
Returns: the three part name of the table, if the schema is present
"""
if self.schema_name is not None:
return f'{self.database_name}.{self.schema_name}.{table_name}'
else:
return table_name
@property
def _sqlalchemy_driver_name(self) -> str:
"""
Selects the sqlalchemy package to use given the database driver
Returns: the sqlalchemy driver in '<database driver>+<sqlalchemy package>' format
"""
if self.driver == 'mssql':
return 'mssql+pyodbc'
else:
print(f'This driver is not supported: {self.driver}')
raise DriverNotSupportedException
@property
def _odbc_driver_name(self) -> str:
"""
Finds the appropriate ODBC driver on the machine given the database driver
Returns: the latest SQL Server Native Client for MS SQL Server databases
"""
if self.driver == 'mssql':
import pyodbc
installed_drivers = pyodbc.drivers()
sql_server_native_clients = [client
for client in installed_drivers
if 'SQL Server Native Client' in client]
try:
latest_client = sorted(sql_server_native_clients)[0]
except IndexError as e:
print('There is no version of the SQL Server Native Client driver installed.')
raise e
return latest_client
else:
print(f'This driver is not supported: {self.driver}')
raise DriverNotSupportedException
[docs]class User:
"""
Represents the user that should be used to access the data.
This should not store the password for obvious reasons, but can be used in conjunction with the password
that is passed to the Database object. This makes it so that the client application that's consuming this
dataset only needs to know the password for the account, not the account or where the account needs to
be setup. It effectively turns the password into a token. This currently supports database users
(e.g. a SQL Server account). To connect as an AD account, run your client application as that account and
don't store the user in the dataset in your dataset catalog. For backwards compatibility, this is not a
necessary attribute on a Dataset.
Args:
username: the username, possibly with a domain (e.g. 'username', 'DOMAIN/username')
"""
def __init__(self, username: str):
self.username = username
[docs]class Dataset:
"""
Represents a dataset as metadata
Args:
name: an identifier for the dataset that is unique within the DatasetCatalog
definition: the sql defining the dataset
fields: a list of Field objects in the dataset
description: non-functional, short description of the dataset, can include notes about
what the dataset is or how it's populated
database: a Database object that contains the data for the dataset
user: a User object that can be used to connect to the database to access the data
"""
def __init__(self, name: str, definition: str, fields: 'List[Field]', description: str = None,
database: 'Database' = None, user: 'User' = None):
self.name = name
self.definition = definition
self.fields = fields
self.description = description
self.database = database
self.user = user
[docs]class DatasetCatalog:
"""
Represents a git repo that hosts datasets in a predetermined structure
This can be thought of as a place to host datasets for data pipelines. But it can also be thought of as a place to
advertise datasets to a broad audience since it only contains metadata and not the underlying data.
Args:
repository: the name of the repository
organization: the name of the organization (or user for GitHub) that owns the repository
platform: that platform that hosts the repo ['github', 'azure-devops']
project: the name of the project that contains the repository, only used for Azure Repos
branch: the branch to use in the repository
username: the username with read access to the repository, only used for Azure Repos
password: the password with read access to the repository, only used for Azure Repos, can also be the one-time
git credentials password that bypasses MFA
"""
def __init__(self, repository: str, organization: str, platform: str, project: str = None, branch: str = 'master',
username: str = None, password: str = None):
self.repository = repository
self.organization = organization
self.project = project
self.branch = branch
self.username = username
self.password = password
self.git = self._get_git_client(platform=platform)
[docs] def _get_git_client(self, platform: str) -> 'git_client.AbstractGitClient':
"""
Configures the appropriate git client given the platform
Args:
platform: the source control platform, one of 'github' or 'azure-devops'
Returns: the appropriate git client
"""
if platform == 'github':
return git_client.GitHubClient(repository=self.repository, organization=self.organization,
branch=self.branch)
elif platform == 'azure-devops':
return git_client.AzureReposClient(repository=self.repository, organization=self.organization,
project=self.project, branch=self.branch,
username=self.username, password=self.password)
else:
raise DatasetCatalogNotSupportedException
[docs]class DatasetContainer:
"""
Represents a target data repository that stores datasets, currently a database
Args:
database: the database to write datasets to
username: a user with create table and insert permissions on the schema
password: the password for the user
"""
def __init__(self, database: 'Database', username: str = None, password: str = None):
self.database = database
self.metadata = self.database.sqlalchemy_metadata(username=username, password=password)
self.bcp = self.database.bcp(username=username, password=password)
[docs] def create_table(self, dataset: 'Dataset'):
"""
Creates a table in the DatasetContainer with the correct attributes to store the data
Args:
dataset: the dataset to use as a blueprint for the table
"""
field_args = [field.sqlalchemy_column for field in dataset.fields]
table = Table(dataset.name, self.metadata, extend_existing=True, *field_args)
table.drop(checkfirst=True)
table.create()