Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ jobs:

- name: check out repository
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: 3.11

- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
Expand All @@ -31,6 +36,9 @@ jobs:
run: |
python tests/upload_server.py &

- name: Install dependencies
run: pip install .

- name: instal pytest
run: pip install pytest

Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ services:
target: development
image: tesp-api
environment:
- CONTAINER_TYPE=docker # Set to "docker", "singularity", or "both"
- CONTAINER_TYPE=singularity # Set to "docker", "singularity", or "both"
container_name: tesp-api
privileged: true
ports:
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ repository = "https://github.com/ndopj/tesp-api"

[tool.poetry.dependencies]
python = "^3.10.0"
aio_pika = "^9.5.7"
fastapi = "^0.75.1"
orjson = "^3.6.8"
gunicorn = "^20.1.0"
Expand Down
5 changes: 3 additions & 2 deletions settings.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
db.mongodb_uri = "mongodb://localhost:27017"
pulsar.url = "http://localhost:8913"
pulsar.status.poll_interval = 4
pulsar.status.max_polls = 100
pulsar.status.max_polls = 400
pulsar.client_timeout = 30

logging.level = "DEBUG"
logging.output_json = false
Expand All @@ -14,5 +15,5 @@ basic_auth.password = "password"

[dev-docker]
db.mongodb_uri = "mongodb://tesp-db:27017"
pulsar.url = "http://pulsar_rest:8913"
pulsar.url = "http://172.17.0.1:8913"
logging.output_json = false
25 changes: 14 additions & 11 deletions tesp_api/repository/task_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,20 @@ def cancel_task(
p_author: Maybe[str],
task_id: ObjectId
) -> Promise:
full_search_query = dict()
full_search_query.update({'_id': task_id})
full_search_query.update(p_author.maybe({}, lambda a: {'author': a}))

return Promise(lambda resolve, reject: resolve(full_search_query)) \
.then(self._tasks.find_one) \
.then(lambda _task: self.update_task(
{'_id': task_id},
{'$set': {'state': TesTaskState.CANCELED}}
)).map(lambda updated_task: updated_task
.map(lambda _updated_task: _updated_task.id))\
search_query = {
'_id': task_id,
'state': {'$in': [
TesTaskState.QUEUED,
TesTaskState.INITIALIZING,
TesTaskState.RUNNING
]}
}
search_query.update(p_author.maybe({}, lambda a: {'author': a}))
update_query = {'$set': {'state': TesTaskState.CANCELED}}

return self.update_task(search_query, update_query)\
.map(lambda updated_task: updated_task
.map(lambda _updated_task: _updated_task.id))\
.catch(handle_data_layer_error)


Expand Down
133 changes: 82 additions & 51 deletions tesp_api/service/event_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from tesp_api.repository.task_repository import task_repository
from tesp_api.service.file_transfer_service import file_transfer_service
from tesp_api.service.error import pulsar_event_handle_error, TaskNotFoundError, TaskExecutorError
from tesp_api.service.pulsar_operations import PulsarRestOperations, PulsarAmpqOperations, DataType
from tesp_api.service.pulsar_operations import PulsarRestOperations, PulsarAmqpOperations, DataType
from tesp_api.repository.model.task import (
TesTaskState,
TesTaskExecutor,
Expand All @@ -29,6 +29,7 @@

CONTAINER_TYPE = os.getenv("CONTAINER_TYPE", "docker")


@local_handler.register(event_name="queued_task")
def handle_queued_task(event: Event) -> None:
"""
Expand All @@ -39,8 +40,9 @@ def handle_queued_task(event: Event) -> None:
match pulsar_service.get_operations():
case PulsarRestOperations() as pulsar_rest_operations:
dispatch_event('queued_task_rest', {**payload, 'pulsar_operations': pulsar_rest_operations})
case PulsarAmpqOperations() as pulsar_ampq_operations:
dispatch_event('queued_task_ampq', {**payload, 'pulsar_operations': pulsar_ampq_operations})
case PulsarAmqpOperations() as pulsar_amqp_operations:
dispatch_event('queued_task_amqp', {**payload, 'pulsar_operations': pulsar_amqp_operations})


@local_handler.register(event_name="queued_task_rest")
async def handle_queued_task_rest(event: Event):
Expand All @@ -53,12 +55,37 @@ async def handle_queued_task_rest(event: Event):

print(f"Queued task rest: {task_id}")

await Promise(lambda resolve, reject: resolve(None))\
.then(lambda nothing: pulsar_operations.setup_job(task_id))\
.map(lambda setup_job_result: dispatch_event('initialize_task', {**payload, 'task_config': setup_job_result}))\
.catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\
await Promise(lambda resolve, reject: resolve(None)) \
.then(lambda nothing: pulsar_operations.setup_job(task_id)) \
.map(lambda setup_job_result: dispatch_event('initialize_task', {**payload, 'task_config': setup_job_result})) \
.catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \
.then(lambda x: x) # Invokes promise, potentially from error handler


@local_handler.register(event_name="queued_task_amqp")
async def handle_queued_task_amqp(event: Event):
"""
Sets up the job in Pulsar via AMQP operations and dispatches an 'initialize_task' event.
"""
event_name, payload = event
task_id: ObjectId = payload['task_id']
pulsar_operations: PulsarAmqpOperations = payload['pulsar_operations']

print(f"Queued task AMQP: {task_id}")

try:
# Setup job via AMQP
setup_job_result = await pulsar_operations.setup_job(task_id)

# Dispatch initialize event
await dispatch_event('initialize_task', {
**payload,
'task_config': setup_job_result
})
except Exception as error:
await pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)


@local_handler.register(event_name="initialize_task")
async def handle_initializing_task(event: Event) -> None:
"""
Expand All @@ -71,10 +98,10 @@ async def handle_initializing_task(event: Event) -> None:

# Merged Logic: Using the feature-complete setup_data from the new version
async def setup_data(job_id: ObjectId,
resources: TesTaskResources,
volumes: List[str],
inputs: List[TesTaskInput],
outputs: List[TesTaskOutput]):
resources: TesTaskResources,
volumes: List[str],
inputs: List[TesTaskInput],
outputs: List[TesTaskOutput]):
resource_conf: dict
volume_confs: List[dict] = []
input_confs: List[dict] = []
Expand Down Expand Up @@ -109,28 +136,29 @@ async def setup_data(job_id: ObjectId,
return resource_conf, volume_confs, input_confs, output_confs

print(f"Initializing task: {task_id}")
await Promise(lambda resolve, reject: resolve(None))\
await Promise(lambda resolve, reject: resolve(None)) \
.then(lambda nothing: task_repository.update_task_state(
task_id,
TesTaskState.QUEUED,
TesTaskState.INITIALIZING
)).map(lambda updated_task: get_else_throw(
updated_task, TaskNotFoundError(task_id, Just(TesTaskState.QUEUED))
)).then(lambda updated_task: setup_data(
task_id,
maybe_of(updated_task.resources).maybe(None, lambda x: x),
maybe_of(updated_task.volumes).maybe([], lambda x: x),
maybe_of(updated_task.inputs).maybe([], lambda x: x),
maybe_of(updated_task.outputs).maybe([], lambda x: x)
)).map(lambda res_input_output_confs: dispatch_event('run_task', {
**payload,
'resource_conf': res_input_output_confs[0],
'volume_confs': res_input_output_confs[1],
'input_confs': res_input_output_confs[2],
'output_confs': res_input_output_confs[3]
})).catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations))\
task_id,
TesTaskState.QUEUED,
TesTaskState.INITIALIZING
)).map(lambda updated_task: get_else_throw(
updated_task, TaskNotFoundError(task_id, Just(TesTaskState.QUEUED))
)).then(lambda updated_task: setup_data(
task_id,
maybe_of(updated_task.resources).maybe(None, lambda x: x),
maybe_of(updated_task.volumes).maybe([], lambda x: x),
maybe_of(updated_task.inputs).maybe([], lambda x: x),
maybe_of(updated_task.outputs).maybe([], lambda x: x)
)).map(lambda res_input_output_confs: dispatch_event('run_task', {
**payload,
'resource_conf': res_input_output_confs[0],
'volume_confs': res_input_output_confs[1],
'input_confs': res_input_output_confs[2],
'output_confs': res_input_output_confs[3]
})).catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \
.then(lambda x: x)


@local_handler.register(event_name="run_task")
async def handle_run_task(event: Event) -> None:
"""
Expand All @@ -146,8 +174,8 @@ async def handle_run_task(event: Event) -> None:
input_confs: List[dict] = payload['input_confs']
output_confs: List[dict] = payload['output_confs']
pulsar_operations: PulsarRestOperations = payload['pulsar_operations']
run_command_str = None

run_command_str = None
command_start_time = datetime.datetime.now(datetime.timezone.utc)

try:
Expand Down Expand Up @@ -175,7 +203,7 @@ async def handle_run_task(event: Event) -> None:
)

stage_exec = TesTaskExecutor(image="willdockerhub/curl-wget:latest", command=[], workdir=Path("/downloads"))

# Stage-in command
stage_in_cmd = ""
stage_in_mount = ""
Expand Down Expand Up @@ -231,27 +259,28 @@ async def handle_run_task(event: Event) -> None:
command_status.get('returncode', -1)
)

current_task_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id})
current_task_obj = get_else_throw(current_task_monad, TaskNotFoundError(task_id))
current_task_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id})
current_task_obj = get_else_throw(current_task_monad, TaskNotFoundError(task_id))

if current_task_obj.state == TesTaskState.CANCELED:
print(f"Task {task_id} found CANCELED after job completion polling. Aborting state changes.")
return
return

if command_status.get('returncode', -1) != 0:
print(f"Task {task_id} executor error (return code: {command_status.get('returncode', -1)}). Setting state to EXECUTOR_ERROR.")
print(
f"Task {task_id} executor error (return code: {command_status.get('returncode', -1)}). Setting state to EXECUTOR_ERROR.")
await task_repository.update_task_state(task_id, TesTaskState.RUNNING, TesTaskState.EXECUTOR_ERROR)
await pulsar_operations.erase_job(task_id)
return
return

print(f"Task {task_id} completed successfully. Setting state to COMPLETE.")
await Promise(lambda resolve, reject: resolve(None)) \
.then(lambda ignored: task_repository.update_task_state(
task_id, TesTaskState.RUNNING, TesTaskState.COMPLETE
)) \
task_id, TesTaskState.RUNNING, TesTaskState.COMPLETE
)) \
.map(lambda task_after_complete_update: get_else_throw(
task_after_complete_update, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING))
)) \
task_after_complete_update, TaskNotFoundError(task_id, Just(TesTaskState.RUNNING))
)) \
.then(lambda ignored: pulsar_operations.erase_job(task_id)) \
.catch(lambda error: pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)) \
.then(lambda x: x)
Expand All @@ -262,22 +291,24 @@ async def handle_run_task(event: Event) -> None:
await pulsar_operations.kill_job(task_id)
await pulsar_operations.erase_job(task_id)
print(f"Task {task_id} Pulsar job cleanup attempted after asyncio cancellation.")

except Exception as error:
print(f"Exception in handle_run_task for task {task_id}: {type(error).__name__} - {error}")

task_state_after_error_monad = await task_repository.get_task(maybe_of(author), {'_id': task_id})
if task_state_after_error_monad.is_just() and task_state_after_error_monad.value.state == TesTaskState.CANCELED:
print(f"Task {task_id} is already CANCELED. Exception '{type(error).__name__}' likely due to this. No further error processing by handler.")
return
print(
f"Task {task_id} is already CANCELED. Exception '{type(error).__name__}' likely due to this. No further error processing by handler.")
return

print(f"Task {task_id} not CANCELED; proceeding with pulsar_event_handle_error for '{type(error).__name__}'.")
error_handler_result = pulsar_event_handle_error(error, task_id, event_name, pulsar_operations)
if asyncio.iscoroutine(error_handler_result) or isinstance(error_handler_result, _Promise):
await error_handler_result

try:
print(f"Ensuring Pulsar job for task {task_id} is erased after general error handling in run_task.")
await pulsar_operations.erase_job(task_id)
except Exception as final_erase_error:
print(f"Error during final Pulsar erase attempt for task {task_id} after general error: {final_erase_error}")

# try:
# print(f"Ensuring Pulsar job for task {task_id} is erased after general error handling in run_task.")
# await pulsar_operations.erase_job(task_id)
# except Exception as final_erase_error:
# print(
# f"Error during final Pulsar erase attempt for task {task_id} after general error: {final_erase_error}")
Loading
Loading