diff --git a/python/grass/app/cli.py b/python/grass/app/cli.py index 0cf05afbb47..64e7ec8cc34 100644 --- a/python/grass/app/cli.py +++ b/python/grass/app/cli.py @@ -27,7 +27,6 @@ import grass.script as gs from grass.app.data import lock_mapset, unlock_mapset, MapsetLockingException -from grass.grassdb.create import create_mapset from grass.exceptions import ScriptError from grass.tools import Tools @@ -98,6 +97,11 @@ def add_mapset_subparser(subparsers): subparser = mapset_subparsers.add_parser("create", help="create a new mapset") subparser.add_argument("path", help="path to the new mapset") + subparser.add_argument( + "--overwrite", + action="store_true", + help=_("overwrite an existing mapset"), + ) subparser.set_defaults(func=subcommand_mapset_create) subparser = mapset_subparsers.add_parser("lock", help="lock a mapset") @@ -133,9 +137,10 @@ def add_mapset_subparser(subparsers): def subcommand_mapset_create(args) -> int: + """Translates args to function parameters""" try: - create_mapset(args.path) - except (ScriptError, OSError) as error: + gs.create_mapset(args.path, overwrite=args.overwrite) + except (ValueError, OSError) as error: print(_("Error creating mapset: {}").format(error), file=sys.stderr) return 1 return 0 diff --git a/python/grass/app/tests/grass_app_cli_test.py b/python/grass/app/tests/grass_app_cli_test.py index 2c8c90a5cca..6cbf57779a6 100644 --- a/python/grass/app/tests/grass_app_cli_test.py +++ b/python/grass/app/tests/grass_app_cli_test.py @@ -240,13 +240,13 @@ def test_create_mapset(tmp_path): def test_mapset_create_exists(tmp_path): - """Check that creating mapset fails when mapset already exists""" + """Check that creating mapset fails unless --overwrite is used""" project = tmp_path / "test_1" mapset = project / "data_1" assert main(["project", "create", str(project)]) == 0 assert main(["mapset", "create", str(mapset)]) == 0 assert main(["mapset", "create", str(mapset)]) == 1 - # There is no overwrite option for mapset yet, so we don't test that. + assert main(["mapset", "create", "--overwrite", str(mapset)]) == 0 def test_create_overwrite(tmp_path): diff --git a/python/grass/script/__init__.py b/python/grass/script/__init__.py index 859c59550a3..3442c8130ca 100644 --- a/python/grass/script/__init__.py +++ b/python/grass/script/__init__.py @@ -8,6 +8,7 @@ compare_key_value_text_files, create_environment, create_location, + create_mapset, create_project, debug, debug_level, @@ -131,6 +132,7 @@ "compare_key_value_text_files", "create_environment", "create_location", + "create_mapset", "create_project", "db_begin_transaction", "db_commit_transaction", diff --git a/python/grass/script/core.py b/python/grass/script/core.py index 389e7c2dd5b..bf7db9df3bf 100644 --- a/python/grass/script/core.py +++ b/python/grass/script/core.py @@ -2121,6 +2121,122 @@ def local_env(): ) +def create_mapset( + path: str | os.PathLike | None = None, + /, + *, + name: str | None = None, + overwrite: bool = False, + initialize_db: bool = True, + env: _Env = None, +) -> None: + """Create a new mapset in an existing project + + The project must already exist. The new mapset uses the project's CRS + and its initial computational region is set from the project's default + region (defined in the PERMANENT mapset). + + By default, the database connection is initialized (equivalent to + ``db.connect -c``), so the mapset is ready for use with vector attribute + data. Set *initialize_db* to False to skip this step. + + The path can be provided in several ways: + + * Full path to the new mapset in an existing project:: + + create_mapset("/home/user/grassdata/project/new_mapset") + + * Path to an existing project with the mapset name as *name*:: + + create_mapset("/home/user/grassdata/project", name="new_mapset") + + * Mapset name only, using the current session's project:: + + create_mapset(name="new_mapset") + create_mapset(name="new_mapset", env=session.env) + + :param path: path to the new mapset or to the project if *name* is given; + can be omitted when *name* is given and a session is active + :param name: mapset name to create (if not part of *path*) + :param overwrite: True to overwrite an existing mapset; the existing + mapset and all its data will be deleted + :param initialize_db: True to initialize the default database + connection in the new mapset (default True) + :param env: environment for the session; if not provided, ``os.environ`` + is used + + :raises ValueError: when neither *path* nor *name* is provided; when + *name* is given without *path* and no session is active; when the + mapset name is illegal or is ``PERMANENT``; when the project does + not exist; or when the mapset already exists and *overwrite* is + False + :raises OSError: when the underlying directory creation fails + """ + from grass.grassdb.create import create_mapset as grassdb_create_mapset + + if env is None: + env = os.environ + + if path is not None and name: + path = Path(path) / name + elif path is None and name: + if not env.get("GISRC"): + msg = "No active session. Provide path or start a session first" + raise ValueError(msg) + gisenv_data = gisenv(env=env) + path = Path(gisenv_data["GISDBASE"]) / gisenv_data["LOCATION_NAME"] / name + elif path is None and not name: + msg = "Either path or name must be provided" + raise ValueError(msg) + + mapset_path = resolve_mapset_path(path=path) + + if not legal_name(mapset_path.mapset): + msg = f"Illegal mapset name <{mapset_path.mapset}>" + raise ValueError(msg) + + if mapset_path.mapset == "PERMANENT": + msg = "Cannot create PERMANENT mapset (it is managed by the project)" + raise ValueError(msg) + + project_dir = mapset_path.path.parent + if not project_dir.exists(): + msg = ( + f"Project <{mapset_path.location}> does not exist in " + f"<{mapset_path.directory}>" + ) + raise ValueError(msg) + + permanent_dir = project_dir / "PERMANENT" + if not permanent_dir.exists(): + msg = ( + f"Project <{mapset_path.location}> is not a valid project " + "(missing PERMANENT mapset)" + ) + raise ValueError(msg) + + if mapset_path.path.exists(): + if not overwrite: + msg = ( + f"Mapset <{mapset_path.mapset}> already exists in project " + f"<{mapset_path.location}>" + ) + raise ValueError(msg) + shutil.rmtree(mapset_path.path) + + grassdb_create_mapset( + mapset_path.directory, mapset_path.location, mapset_path.mapset + ) + + if initialize_db: + # Lazy import to avoid circular dependency between grass.script + # and grass.script.setup at module load time. + from grass.script import setup # pylint: disable=import-outside-toplevel + + with setup.init(mapset_path.path, env=env.copy()) as session: + run_command("db.connect", flags="c", env=session.env) + + def _set_location_description(path, location, text): """Set description (aka title aka MYNAME) for a location diff --git a/python/grass/script/tests/grass_script_create_mapset_test.py b/python/grass/script/tests/grass_script_create_mapset_test.py new file mode 100644 index 00000000000..9cc0c91b764 --- /dev/null +++ b/python/grass/script/tests/grass_script_create_mapset_test.py @@ -0,0 +1,186 @@ +"""Tests for grass.script.create_mapset""" + +import os + +import pytest + +import grass.script as gs +from grass.tools import Tools + + +@pytest.fixture +def project_path(tmp_path): + """Create a XY project and return its path.""" + project = tmp_path / "test_project" + gs.create_project(project) + return project + + +def test_create_mapset_full_path(project_path): + """Check that mapset is created when full path is given.""" + mapset_path = project_path / "new_mapset" + gs.create_mapset(mapset_path, initialize_db=False) + assert mapset_path.exists() + assert (mapset_path / "WIND").exists() + + +def test_create_mapset_string_path(project_path): + """Check that mapset is created when path is given as a string.""" + gs.create_mapset(str(project_path), name="new_mapset", initialize_db=False) + mapset_path = project_path / "new_mapset" + assert mapset_path.exists() + assert (mapset_path / "WIND").exists() + + +def test_create_mapset_with_name(project_path): + """Check that mapset is created when project path and name are given.""" + gs.create_mapset(project_path, name="new_mapset", initialize_db=False) + mapset_path = project_path / "new_mapset" + assert mapset_path.exists() + assert (mapset_path / "WIND").exists() + + +def test_create_mapset_usable(project_path): + """Check that a created mapset can be used in a session.""" + gs.create_mapset(project_path, name="new_mapset") + with gs.setup.init(project_path / "new_mapset", env=os.environ.copy()) as session: + info = gs.gisenv(env=session.env) + assert info["MAPSET"] == "new_mapset" + + +def test_create_mapset_no_overwrite(project_path): + """Check that existing mapset raises error without overwrite.""" + gs.create_mapset(project_path, name="new_mapset") + with pytest.raises(ValueError, match="already exists"): + gs.create_mapset(project_path, name="new_mapset") + + +def test_create_mapset_overwrite(project_path): + """Check that existing mapset can be overwritten.""" + mapset_path = project_path / "new_mapset" + gs.create_mapset(project_path, name="new_mapset", initialize_db=False) + # Add a file to verify old content is removed + marker = mapset_path / "marker_file" + marker.write_text("test") + assert marker.exists() + + gs.create_mapset( + project_path, name="new_mapset", overwrite=True, initialize_db=False + ) + assert mapset_path.exists() + assert (mapset_path / "WIND").exists() + assert not marker.exists() + + +def test_create_mapset_nonexistent_project(tmp_path): + """Check error when project does not exist.""" + with pytest.raises(ValueError, match="does not exist"): + gs.create_mapset(tmp_path / "nonexistent" / "new_mapset") + + +def test_create_mapset_invalid_project(tmp_path): + """Check error when project directory exists but has no PERMANENT mapset.""" + invalid_project = tmp_path / "invalid_project" + invalid_project.mkdir() + with pytest.raises(ValueError, match="PERMANENT"): + gs.create_mapset(invalid_project, name="new_mapset") + + +def test_create_mapset_permanent_rejected(project_path): + """Check that creating PERMANENT mapset is rejected.""" + with pytest.raises(ValueError, match="PERMANENT"): + gs.create_mapset(project_path, name="PERMANENT") + + +@pytest.mark.parametrize("name", [".hidden", "has space", "with@at"]) +def test_create_mapset_illegal_name(project_path, name): + """Check that illegal mapset names are rejected.""" + with pytest.raises(ValueError, match="Illegal"): + gs.create_mapset(project_path, name=name, initialize_db=False) + + +def test_create_multiple_mapsets(project_path): + """Check that multiple mapsets can be created in the same project.""" + names = ["mapset_a", "mapset_b", "mapset_c"] + for name in names: + gs.create_mapset(project_path, name=name, initialize_db=False) + for name in names: + assert (project_path / name).exists() + assert (project_path / name / "WIND").exists() + + +def test_create_mapset_name_only(project_path): + """Check that mapset is created using only name within an active session.""" + with gs.setup.init(project_path, env=os.environ.copy()) as session: + gs.create_mapset(name="new_mapset", env=session.env) + mapset_path = project_path / "new_mapset" + assert mapset_path.exists() + assert (mapset_path / "WIND").exists() + + +@pytest.mark.usefixtures("mock_no_session") +def test_create_mapset_name_only_no_session(): + """Check that name-only fails without an active session.""" + with pytest.raises(ValueError, match="No active session"): + gs.create_mapset(name="new_mapset") + + +def test_create_mapset_no_arguments(): + """Check that calling without path or name raises an error.""" + with pytest.raises(ValueError, match="Either path or name"): + gs.create_mapset() + + +def test_create_mapset_db_initialized(project_path): + """Check that database connection is initialized by default.""" + gs.create_mapset(project_path, name="new_mapset") + mapset_path = project_path / "new_mapset" + assert (mapset_path / "VAR").exists() + with gs.setup.init(mapset_path, env=os.environ.copy()) as session: + tools = Tools(session=session) + conn = tools.db_connect(flags="p", format="json") + assert conn["driver"] == "sqlite" + assert "new_mapset" in conn["database"] + + +def test_create_mapset_db_not_initialized(project_path): + """Check that database initialization is skipped when disabled.""" + gs.create_mapset(project_path, name="new_mapset", initialize_db=False) + mapset_path = project_path / "new_mapset" + assert mapset_path.exists() + assert (mapset_path / "WIND").exists() + assert not (mapset_path / "VAR").exists() + + +def test_create_mapset_with_existing_session_env(project_path): + """Check that creating a mapset works when env comes from an existing session. + + The env already has a GISRC pointing to PERMANENT. The DB initialization + must create its own session for the new mapset, not reuse the old one. + """ + with gs.setup.init(project_path, env=os.environ.copy()) as session: + gs.create_mapset(project_path, name="new_mapset", env=session.env) + mapset_path = project_path / "new_mapset" + assert mapset_path.exists() + assert (mapset_path / "WIND").exists() + assert (mapset_path / "VAR").exists() + with gs.setup.init(mapset_path, env=os.environ.copy()) as session: + tools = Tools(session=session) + conn = tools.db_connect(flags="p", format="json") + assert conn["driver"] == "sqlite" + assert "new_mapset" in conn["database"] + + +def test_create_mapset_region_from_default(project_path): + """Check that the new mapset's region matches the project's default region.""" + with gs.setup.init(project_path, env=os.environ.copy()) as session: + tools = Tools(session=session) + tools.g_region(n=100, s=0, e=200, w=0, res=10, flags="s") + gs.create_mapset(project_path, name="new_mapset") + with gs.setup.init(project_path / "new_mapset", env=os.environ.copy()) as session: + tools = Tools(session=session) + region = tools.g_region(flags="p", format="json") + assert region["north"] == 100 + assert region["south"] == 0 + assert region["east"] == 200 + assert region["west"] == 0