Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/reference_index/apps/users.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ users
api
courses_urls
forms
management.commands.archive_users
management.commands.import_groups
management.commands.lock
models
Expand Down
24 changes: 21 additions & 3 deletions intranet/apps/dataimport/management/commands/year_cleanup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ def add_arguments(self, parser):
default=get_senior_graduation_year(),
help="The senior graduation year",
)
user_mode = parser.add_mutually_exclusive_group(required=True)
user_mode.add_argument("--delete-users", action="store_true", dest="delete_users", help="Delete graduated users.")
user_mode.add_argument("--archive-users", action="store_true", dest="archive_users", help="Archive graduated users instead of deleting.")
user_mode.add_argument("--no-user-changes", action="store_true", dest="no_user_changes", help="Do not change users.")

def ask(self, q):
if input(f"{q} [Yy]: ").lower() != "y":
Expand Down Expand Up @@ -81,9 +85,16 @@ def handle(self, *args, **options):
if do_run:
self.update_welcome()

self.stdout.write("Deleting graduated users")
if do_run:
self.handle_delete(senior_grad_year=senior_grad_year)
if options["delete_users"]:
self.stdout.write("Deleting graduated users")
if do_run:
self.handle_delete(senior_grad_year=senior_grad_year)
elif options["archive_users"]:
self.stdout.write("Archiving graduated users")
if do_run:
self.handle_archive(senior_grad_year=senior_grad_year)
else:
self.stdout.write("Skipping user delete/archive")

self.stdout.write("Archiving admin comments")
if do_run:
Expand Down Expand Up @@ -112,3 +123,10 @@ def handle_delete(self, *, senior_grad_year: int):
usr.user_type = "alum"
usr.save()
self.stdout.write(f"User {usr.username} KEEP")

def handle_archive(self, *, senior_grad_year: int):
users = get_user_model().objects.filter(graduation_year__lt=senior_grad_year).exclude(user_type="alum")
archived_count, already_archived_count = get_user_model().archive_users(users)
self.stdout.write(f"Archived users: {archived_count}")
if already_archived_count:
self.stdout.write(f"Already archived users: {already_archived_count}")
4 changes: 2 additions & 2 deletions intranet/apps/dataimport/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def test_year_cleanup(self):
out = StringIO()
year = timezone.now().year
turnover_date = datetime(year, 7, 1)
call_command("year_cleanup", stdout=out, senior_grad_year=year + 1)
call_command("year_cleanup", stdout=out, senior_grad_year=year + 1, delete_users=True)
output = [
"In pretend mode.",
"Turnover date set to: {}".format(turnover_date.strftime("%c")),
Expand Down Expand Up @@ -52,7 +52,7 @@ def test_actual_year_cleanup(self):
"intranet.apps.dataimport.management.commands.year_cleanup.timezone.now",
return_value=datetime(2020, 6, 20, tzinfo=pytz.timezone("America/New_York")),
) as m:
call_command("year_cleanup", senior_grad_year=2021, run=True, confirm=True)
call_command("year_cleanup", senior_grad_year=2021, run=True, confirm=True, delete_users=True)

m.assert_called()

Expand Down
12 changes: 11 additions & 1 deletion intranet/apps/users/admin.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,19 @@
from django.contrib import admin
from django.contrib import admin, messages

from ..users.models import Course, Section, User, UserProperties


@admin.register(User)
class UserAdmin(admin.ModelAdmin):
@admin.action(description="Archive selected users")
def archive_users(self, request, queryset):
archived_count, already_archived_count = User.archive_users(queryset)

if archived_count:
self.message_user(request, f"Archived {archived_count} users.")
if already_archived_count:
self.message_user(request, f"{already_archived_count} users already archived.", level=messages.WARNING)

# Render is_active using checkmarks or crosses
def user_active(self, obj):
return obj.is_active
Expand Down Expand Up @@ -42,6 +51,7 @@ def user_active(self, obj):
"nickname",
"student_id",
)
actions = ("archive_users",)


admin.site.register(UserProperties)
Expand Down
107 changes: 107 additions & 0 deletions intranet/apps/users/management/commands/archive_users.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import csv

from django.contrib.auth import get_user_model
from django.core.management.base import BaseCommand, CommandError


class Command(BaseCommand):
help = "Archive users (lock accounts) from a CSV file."

def add_arguments(self, parser):
parser.add_argument(
"--filename",
dest="filename",
type=str,
required=True,
help="Filename to import data from.",
)
parser.add_argument(
"--column-header",
dest="header",
default="username",
type=str,
help="Header associated with the identifier column in the CSV.",
)
parser.add_argument(
"--lookup-field",
dest="lookup_field",
default="username",
type=str,
help="User model field used to match identifiers (ex: username, student_id, email).",
)
parser.add_argument(
"--run",
action="store_true",
dest="run",
help="Actually run.",
)
parser.add_argument(
"--confirm",
action="store_true",
dest="confirm",
help="Skip confirmation prompt (only applies with --run).",
)

def ask(self, q) -> bool:
return input(f"{q} [y/N]: ").strip().lower() == "y"

def read_identifiers(self, filename: str, column_header: str):
identifiers = []
try:
with open(filename, encoding="utf-8") as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
value = row.get(column_header)
if value:
identifiers.append(value.strip())
except FileNotFoundError as e:
raise CommandError(f"File not found: {filename}") from e
except OSError as e:
raise CommandError(f"Error reading file: {e}") from e
return identifiers

def handle(self, *args, **options):
identifiers = self.read_identifiers(options["filename"], options["header"])
if not identifiers:
self.stdout.write(self.style.WARNING("No identifiers found in the CSV."))
return

total_identifiers = len(identifiers)
unique_identifiers = list(dict.fromkeys(identifiers))
duplicate_count = total_identifiers - len(unique_identifiers)

user_model = get_user_model()
lookup_field = options["lookup_field"]
valid_fields = {field.name for field in user_model._meta.get_fields()}
if lookup_field not in valid_fields:
raise CommandError(f"Invalid lookup field: {lookup_field}")
lookup_key = f"{lookup_field}__in"

users = user_model.objects.filter(**{lookup_key: unique_identifiers})
found_values = set(users.values_list(lookup_field, flat=True))
missing = sorted(value for value in unique_identifiers if value not in found_values)

self.stdout.write(f"Identifiers provided: {total_identifiers}")
self.stdout.write(f"Unique identifiers: {len(unique_identifiers)}")
if duplicate_count:
self.stdout.write(self.style.WARNING(f"Duplicate identifiers: {duplicate_count}"))
self.stdout.write(f"Matched users: {users.count()}")
self.stdout.write(f"Missing identifiers: {len(missing)}")
if missing:
self.stdout.write(self.style.WARNING(str(missing)))

if not options["run"]:
self.stdout.write("Dry run mode.")
return

if not options["confirm"]:
if not self.ask(
"This script will archive users (lock accounts). Ensure\nthat you know what you're doing before proceeding.\n\nContinue?"
):
self.stdout.write("Aborted.")
return

archived_count, already_archived_count = user_model.archive_users(users)
self.stdout.write(self.style.SUCCESS(f"Archived users: {archived_count}"))
if already_archived_count:
self.stdout.write(self.style.WARNING(f"Already archived users: {already_archived_count}"))
20 changes: 20 additions & 0 deletions intranet/apps/users/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,26 @@ def is_active(self) -> bool:

return not self.username.startswith("INVALID_USER") and not self.user_locked

@classmethod
def archive_users(cls, queryset, *, update_admin_comments: bool = False) -> tuple[int, int]:
to_archive = queryset.filter(user_locked=False)
already_archived_count = queryset.filter(user_locked=True).count()

if update_admin_comments:
current_year = timezone.localdate().year
previous_year = current_year - 1
archived_count = 0
for user in to_archive:
user.user_locked = True
if user.admin_comments:
user.admin_comments = f"\n=== {previous_year}-{current_year} comments ===\n{user.admin_comments}"
user.save(update_fields=["user_locked", "admin_comments"])
archived_count += 1
else:
archived_count = to_archive.update(user_locked=True)

return archived_count, already_archived_count

@property
def is_restricted(self) -> bool:
"""Checks if user needs the restricted view of Ion
Expand Down
Loading