diff --git a/rcsb/workflow/cli/ExdbExport.py b/rcsb/workflow/cli/ExdbExport.py new file mode 100644 index 0000000..fbbda91 --- /dev/null +++ b/rcsb/workflow/cli/ExdbExport.py @@ -0,0 +1,164 @@ +""" +Command-line entry point for exporting or processing data from MongoDB. +""" + +import logging +import os +import re +import sys +from argparse import ArgumentParser, ArgumentTypeError, Namespace +from collections.abc import Sequence, Mapping +from dataclasses import dataclass +from datetime import datetime +from functools import partial +from pathlib import Path +from typing import ClassVar, Optional, Union + +from rcsb.workflow.mongo.IncrementalExporter import IncrementalExporter, IncrementalSource + +logger = logging.getLogger("rcsb-workflow") + + +@dataclass(frozen=True, slots=True) +class LogConfig: + """Configuration for logging via CLI invocation.""" + + format: str = "{asctime} {levelname:<7} {module}:{lineno} {message}" + date_format: str = "%Y-%m-%d %H:%M:%S.%f" + default_root_level: int = logging.WARNING + default_level: int = logging.INFO + + def apply(self, *, verbose: int, quiet: int) -> None: + """Applies a global logging configuration, choosing levels from `verbose` and `quiet`.""" + level_offset = 10 * quiet - 10 * verbose + logging.basicConfig( + level=max(self.default_root_level + level_offset, 0), + format=self.format, + datefmt=self.date_format, + style="{", + force=True + ) + logger.setLevel(max(self.default_level + level_offset, 0)) + + +class _Args: + + @staticmethod + def int(s: str) -> int: + return min(int(s), 0) + + @staticmethod + def utc_dt(s: str) -> datetime: + dt = datetime.fromisoformat(s) + if not s.endswith("Z") or dt.tzname() not in {"UTC", "Etc/UTC"}: + msg = f"date-time '{s}' is not UTC or does not end in 'Z'" + raise ArgumentTypeError(msg) + return dt + + @staticmethod + def csv(s: str) -> Optional[set[str]]: + if s.strip() == "*": + return set() + return set(map(str.strip, s.split(","))) + + @staticmethod + def out_file(s: str, kwargs: Mapping[str, Union[str, int, Path]]) -> Path: + pattern: re.Pattern = re.compile(r"\{([^:}]*)(:[^}]*)?}") + s = pattern.sub(partial(_Args._sub, kwargs=kwargs), s) + return Path(s) + + @staticmethod + def _sub(m: re.Match, kwargs: Mapping[str, Union[str, int, Path]]) -> str: + var = m.group(1) + fb: Optional[str] = str(m.group(2).removeprefix(":")) if m.group(2) else None + if not var or fb and var not in kwargs or fb == "": + msg = f"Invalid substitution: '{m.group(0)}'" + raise ArgumentTypeError(msg) + value = kwargs.get(var, fb) + return value.name if isinstance(value, Path) else str(value) + + +@dataclass(frozen=True, slots=True) +class Main: + """CLI entry point for reading MongoDB data.""" + + _LOG_CONFIG: ClassVar[LogConfig] = LogConfig() + _DEFAULT_DB_URI: ClassVar[str] = "mongodb://localhost:27017" + _DEFAULT_DB_NAME: ClassVar[str] = "exdb" + + def run(self, args: Sequence[str]) -> None: + ns: Namespace = self._parser().parse_args(args) + self._LOG_CONFIG.apply(verbose=ns.verbose, quiet=ns.quiet) + db_uri = os.environ.get("MONGODB_URI", self._DEFAULT_DB_URI) + db_name = os.environ.get("MONGODB_NAME", self._DEFAULT_DB_NAME) + match ns.subcommand: + case "export": + source = IncrementalSource(db_uri, db_name, ns.collection, ns.fields | ns.id_fields, ns.since_field) + IncrementalExporter(source).export_to(ns.to) + + def _parser(self) -> ArgumentParser: + sup = ArgumentParser( + allow_abbrev=False, + description="Read data from a MongoDB database.", + epilog=( + "Environment variables:" + f" MONGODB_URI mongo:// URI with any needed credentials (default: {self._DEFAULT_DB_URI})." + f" MONGODB_NAME The database name (default: {self._DEFAULT_DB_NAME})." + ), + ) + sup.add_argument( + "-v", "--verbose", action="count", + help="Decrement the log level (repeatable)." + ) + sup.add_argument( + "-q", "--quiet", action="count", + help="Increment the log level (repeatable)." + ) + subs = sup.add_subparsers( + title="subcommands", dest="subcommand", description="subcommands", required=True + ) + # Subcommand 1: `export` + export = subs.add_parser( + "export", allow_abbrev=False, + help= + "Compute a delta from a previous export to a current MongoDB collection." + "Documents to be removed will contain only the id fields." + ) + export.add_argument( + "collection", metavar="COLLECTION", + help="Name of the MongoDB collection." + ) + export.add_argument( + "--fields", type=_Args.csv, default="*", metavar="CSV", + help="List of fields to export." + ) + export.add_argument( + "--id-fields", type=_Args.csv, default="_id", required=True, metavar="CSV", + help="List of fields needed to identify documents. Included in to-delete documents." + ) + export.add_argument( + "--to", default="{collection}-{since:all}.json", metavar="JSON-FILE", + help="Output JSON file path. May refer to {collection}, {delta[:if-empty]}, and {since[:if-empty]}." + ) + export.add_argument( + "--delta", type=Path, metavar="JSON-FILE", + help="Compute a delta from this previous export." + ) + export.add_argument( + "--since", type=_Args.utc_dt, metavar="RFC-3339", + help="Only export docs where '--since-field' ≥ this UTC date-time. Must be RFC 3339 with a 'Z' offset." + ) + export.add_argument( + "--since-field", default="timestamp", metavar="STR", + help="Name of the timestamp field for comparison with '--since'." + ) + # Done + return sup + + +def main() -> None: + Main().run(sys.argv[1:]) + + +if __name__ == "__main__": + main() diff --git a/rcsb/workflow/mongo/IncrementalExporter.py b/rcsb/workflow/mongo/IncrementalExporter.py new file mode 100644 index 0000000..d355b99 --- /dev/null +++ b/rcsb/workflow/mongo/IncrementalExporter.py @@ -0,0 +1,102 @@ +""" +Command-line entry point for exporting or processing data from MongoDB. +""" + +import logging +import os +import sys +import shutil +import time +from argparse import ArgumentParser, Namespace +from collections.abc import Generator, Sequence, Callable, Mapping +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import ClassVar, Optional, Any + +from bson.json_util import dumps as bson_dumps, RELAXED_JSON_OPTIONS, JSONOptions +from pymongo import MongoClient + +logger = logging.getLogger("rcsb-workflow") + + +@dataclass(frozen=True, slots=True) +class UpdateInfo: + path: Path + timestamp: datetime + + +@dataclass(frozen=True, slots=True) +class IncrementalSource: + """How to connect to MongoDB and what data to read for export.""" + + client_factory: ClassVar[Callable[[str], MongoClient]] = MongoClient + db_uri: str + db_name: str + collection: str + fields: set[str] + timestamp_field: Optional[str] + + def __call__(self, *, last_timestamp: Optional[datetime]) -> Generator[dict]: + projection: Optional[dict[str, bool]] = None + if self.fields: + # PyMongo includes `_id` if `projection` is a list or doesn't contain a key `_id`. + # To exclude `_id`, we need to specify `{"_id": False}`. + projection = {field: True for field in self.fields} + if "_id" not in self.fields: + projection |= {"_id": False} + query = {} + if self.timestamp_field and last_timestamp: + query[self.timestamp_field] = {"$gte": last_timestamp.isoformat()} + with self.client_factory(self.db_uri) as client: + db = client[self.db_name][self.collection] + yield from db.find(query, projection=projection) + + +@dataclass(frozen=True, slots=True) +class IncrementalExporter: + """Tool that reads a MongoDB ``source`` and writes to a file.""" + + source: IncrementalSource + json_options: JSONOptions = RELAXED_JSON_OPTIONS + + def export_to(self, since: Optional[datetime], previous_export: Optional[Path], out_file: Path) -> None: + temp_file = self._get_temp_file(out_file) + temp_file_2 = self._get_temp_file_2(out_file) + logger.info(f"Reading documents from {self.source.collection}...") + try: + n_docs = self._export(since, temp_file) + if previous_export: + self._filter(previous_export, temp_file, temp_file_2) + temp_file = temp_file_2 + shutil.move(temp_file, out_file) + finally: + temp_file_2.unlink(missing_ok=True) + temp_file.unlink(missing_ok=True) + logger.info(f"Wrote {n_docs} documents to {out_file}.") + + def _export(self, since: Optional[datetime], temp_file: Path) -> int: + t0 = time.monotonic() + logger.debug(f"Writing to temp file {temp_file}.") + n_docs = 0 + with temp_file.open("w", encoding="utf-8", newline="\n") as f: + f.write("[\n") + for doc in self.source(last_timestamp=since): + f.write(bson_dumps(doc, json_options=self.json_options)) + if n_docs > 0: + f.write(",\n") + n_docs += 1 + if n_docs % 1000 == 0: + logger.debug(f"Wrote {n_docs} docs (Δt = {time.monotonic() - t0:.1} s).") + f.write("\n]\n") + logger.debug(f"Finished export. Wrote {n_docs} docs in {time.monotonic() - t0:.1} s.") + return n_docs + + def _filter(self, previous_export: Path, temp_file: Path, out_file: Path) -> None: + raise NotImplementedError() + + def _get_temp_file(self, out_file: Path) -> Path: + return out_file.parent / f".{out_file.name}.raw.temp" + + def _get_temp_file_2(self, out_file: Path) -> Path: + return out_file.parent / f".{out_file.name}.filtered.temp" diff --git a/rcsb/workflow/mongo/__init__.py b/rcsb/workflow/mongo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/setup.py b/setup.py index e1bcf31..db242c6 100755 --- a/setup.py +++ b/setup.py @@ -5,67 +5,46 @@ # 8-Jun-2021 jdw treat requirements.txt dependencies are authoratative, add markdown README.md text # import re - -from setuptools import find_packages -from setuptools import setup - -packages = [] -thisPackage = "rcsb.workflow" - -with open("rcsb/workflow/cli/__init__.py", "r", encoding="utf-8") as fd: - version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', fd.read(), re.MULTILINE).group(1) - - -# Load packages from requirements*.txt -with open("requirements.txt", "r", encoding="utf-8") as ifh: - packagesRequired = [ln.strip() for ln in ifh.readlines()] - -with open("README.md", "r", encoding="utf-8") as ifh: - longDescription = ifh.read() - -if not version: - raise RuntimeError("Cannot find version information") +from pathlib import Path +from setuptools import find_packages, setup + +version = (re.compile(r"""^__version__ *= *['"]([^'"]+)['"]$""", re.MULTILINE) + .search(Path("rcsb/db/cli/__init__.py").read_text("utf-8")).group(1)) +packages = find_packages(exclude=["rcsb.mock-data", "rcsb.workflow.tests*"]) +requirements = [ + r for r in Path("requirements.txt").read_text("utf-8").splitlines() + if not r.startswith("-") # Strip pip options (e.g. `--extra-index-url`). +] +console_scripts = [ + "exdb_wf_cli=rcsb.workflow.cli.ExDbExec:main", + "imgs_exec_cli=rcsb.workflow.cli.ImgExec:main", +] setup( - name=thisPackage, + name="rcsb.workflow", version=version, description="RCSB Python data processing and ETL/ELT workflow entry points", long_description_content_type="text/markdown", - long_description=longDescription, + long_description=Path("README.md").read_text(encoding="utf-8"), author="John Westbrook", author_email="john.westbrook@rcsb.org", url="https://github.com/rcsb/py-rcsb_workflow", - # license="Apache 2.0", classifiers=[ "Development Status :: 3 - Alpha", - # 'Development Status :: 5 - Production/Stable', "Intended Audience :: Developers", "Natural Language :: English", "License :: OSI Approved :: Apache Software License", "Programming Language :: Python", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", ], - # entry_points={"console_scripts": ["cactvs_annotate_mol=rcsb.workflow.cactvsAnnotateMol:main"]}, - entry_points={"console_scripts": ["exdb_wf_cli=rcsb.workflow.cli.ExDbExec:main", "imgs_exec_cli=rcsb.workflow.cli.ImgExec:main"]}, - # The following is somewhat flakey -- - # dependency_links=[], - install_requires=packagesRequired[1:], - packages=find_packages(exclude=["rcsb.mock-data", "rcsb.workflow.tests", "rcsb.workflow.tests-*", "tests.*"]), - package_data={ - # If any package contains *.md or *.rst ... files, include them: - "": ["*.md", "*.rst", "*.txt", "*.cfg"] - }, - # + entry_points={"console_scripts": console_scripts}, + requires_python=">=3.10", + install_requires=requirements, + packages=packages, test_suite="rcsb.workflow.tests", tests_require=["tox"], - # - # Not configured ... - extras_require={"dev": ["check-manifest"], "test": ["coverage"]}, - # Added for - command_options={"build_sphinx": {"project": ("setup.py", thisPackage), "version": ("setup.py", version), "release": ("setup.py", version)}}, # This setting for namespace package support - zip_safe=False, )