Skip to content

Fallback janitor job two #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 30 commits into
base: 2.0
Choose a base branch
from

Conversation

SashankBalusu
Copy link

@SashankBalusu SashankBalusu commented Mar 17, 2025

Secondary Janitor Job to Handle Timed-Out and Failed Transactions

Description:

This PR enhances the janitor job by implementing the handling of timed-out and failed transactions. It adds logic to:

  • Move transactions that exceed the timeout threshold from running/ to failed/.
  • Clean up associated metafiles and locator files for failed transactions.
  • Perform a brute-force search to remove lingering metafiles tied to failed transactions.
  • These changes improve the reliability of the janitor cleanup process.

Changes Made:

Janitor_delete_timed_out_transaction:

  • Goes through the in progress transaction folder and checks to see if the heartbeat timeout (stored in the filename) has been reached for any files
  • If the heartbeat timeout has been reached, the files are moved to the failed directory with TIMEOUT_TXN constant appended to the file name
  • Uses brute_force_search_matching_metafiles to delete orphaned metafiles.

Janitor_remove_files_in_failed:

  • Reads failed transactions and removes their associated metafiles.
  • Renames cleaned-up failed transactions to append a SUCCESSFULLY_CLEANED tag. This ensures that when iterating through the failed transaction directory we don't attempt to delete already cleaned failed transactions.
  • While iterating through the failed transaction directory to check for unsuccessful cleanings we also look for a transaction that was in progress that gets marked as failed because their heartbeat timeout passed but the file was unsuccessfully cleaned
  • For these files, we simply call brute_force_search_matching_metafiles like usual.

Brute_force_search_matching_metafiles:

  • Recursively searches directories to delete any metafiles matching failed transaction id

Added Transaction Status Tags:

  • Introduced constants:
  • OPERATION_TIMEOUTS = {
    "create": 5,
    "update": 3,
    "delete": 4,
    "read_siblings": 2,
    "read_children": 2,
    "read_latest": 3,
    "read_exists": 1,
    }
    • sets constant limits for how long each operation is expected to take
  • SUCCESSFULLY_CLEANED = "cleaned"
  • CURRENTLY_CLEANING = "working"
  • TIMEOUT_TXN = “timedout”
  • Appended these tags to transaction filenames to track their status:
    • Timed out: f"{id}{TXN_PART_SEPARATOR}{TIMEOUT_TXN}"
      • Identifies case where transaction times out and is caught by heartbeat failed
    • Failed but associated files not cleaned:f"{id}{TXN_PART_SEPARATOR}{CURRENTLY_CLEANING}"
      • Identifies case where process crashes causing transaction to be moved to failed but not properly deleted
    • Deleted: f"{id}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}"
      • Identifies when a failed transaction is successfully deleted
        This allows us to identify failed transactions without having to read the transaction file

SashankBalusu and others added 25 commits February 27, 2025 13:32
…s catalog so that I can run the janitor job in order to execute the test
… current implementation will move all running transactions that have timed out will be moved into the failed transactions directory. Unfinished: need to ensure that I am correctly creating transactions that I can test
…tor job removes failed transactions that have not yet been deleted from the failed transaction directory
…is function will iterate through failed transactions directory and ensure that each file within this directory is properly deleted
…eck each transaction that has not been deleted and ensure id is not in failed transactions directory
…roach and instead took path to deserialized transaction and used read class method from transaction.py toaccess operations from transaction so that they can be deleted
…sfully deleted, has timed out, or has failed and not been deleted yet
@pdames pdames requested review from pdames and Zyiqin-Miranda March 17, 2025 18:11
Copy link
Member

@Zyiqin-Miranda Zyiqin-Miranda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting together the implementation for the fallback janitor! Looking forward to see how/where this janitor is called and integrated with existing failed transaction cleanup logic inside Transaction.commit() functions.

One high-level comment is the placement of the janitor, whether it should be part of the deltacat/storage/model/transaction, since it's a cleaning up job for transaction specifically instead of a general compute job? Curious to learn your opinion here.

@@ -78,7 +78,7 @@ Note that (except **Immutable ID**) this is the same format used by **Metadata R

### Transaction Log Directory
The **Transaction Log Directory** (`${CATALOG_ROOT}/txn`) is a special directory in the **Catalog Root Directory** which holds all successfully committed transactions. It contains one **Transaction Log File** per successful transaction recording transaction details.
```
``
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accidentally delete a backtick here?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like it! Fixed.

TIMEOUT_TXN = "timedout"

#operation timeout constants
OPERATION_TIMEOUTS = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you briefly explain how are these timeout values generated? Are these just like place holder values right now or the value actually means that should be amount of time it takes to complete the operation?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are fixed values that we decided to use. We picked arbitrarily large values per Patrick's advice, but @SashankBalusu is currently working on something to get more accurate values, that he could give more information on in a bit.

It is difficult to determine a timeout time based on the size of the data, however, so these values will be constant across all transaction operations once they are finalized.

total_time_for_transaction += OPERATION_TIMEOUTS.get(operation.type, 0)

start_time = float(self.id.split(TXN_PART_SEPARATOR)[0])
final_time_heartbeat = start_time + (total_time_for_transaction * 1_000_000_000) # Convert seconds to nanoseconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we replace the number 1_000_000_000 with the imported constant NANOS_PER_SEC?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, done!

@@ -591,6 +619,7 @@ def _commit_write(
locator_write_paths = []
try:
for operation in self.operations:
total_time_for_transaction += OPERATION_TIMEOUTS[operation.type]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this total_time_for_transaction get aggregated second time here and is not used anywhere downstream?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is not used anywhere downstream. Since we have the for loop above on line 603 and 604 summing the heartbeat timeout, we will remove this one.


# Transaction Status constants
SUCCESSFULLY_CLEANED = "cleaned"
CURRENTLY_CLEANING = "working"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: working seems a bit unclear to me whether it's the janitor job that's working on this file or the transaction is a working in progress. How about cleaning or something else that indicates it's a janitor job?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, changed to "cleaning."

@@ -410,6 +415,20 @@ def end_time(self) -> Optional[int]:
Returns the end time of the transaction.
"""
return self.get("end_time")

def _mark_status(self, status: str) -> None:
Copy link
Member

@Zyiqin-Miranda Zyiqin-Miranda Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious how/when is this function being called? In addition, for the status of the transaction, would it look more clear that we have a another enum like TransactionStatus==SUCCESS as additional class attribute?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was used in a previous implementation of the janitor job, but was not removed when we changed it. We have deleted it now as it is no longer in use. All tests still pass!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to delete the _mark_start_time method or just create a new _mark_status method? The _mark_start_time method is still used by transaction commit, so it seems like the current code will be broken until we add it back.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I did not mean to delete that, I meant to delete _mark_status instead, since it is not used anywhere else in the code.

I've fixed this change!

if transaction_id in base_name:
try:
filesystem.delete_file(entry.path)
print(f"Deleted file: {entry.path}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we switch to use DeltaCAT logger instead of print statement?

import logging
from deltacat import logs
logger = logs.configure_deltacat_logger(logging.getLogger(__name__))

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, thanks for catching that. We've changed all print statements in this file to loggers, setting levels as we saw fit, but please double check us on that!


txn_filenames = []
for txn_id in txn_ids:
txn_filename = f"{start_time}{txn_id}{TXN_PART_SEPARATOR}{TXN_PART_SEPARATOR}{time.time_ns()}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this filename be f"{start_time}{TXN_PART_SEPARATOR}{txn_id}{TXN_PART_SEPARATOR}{time.time_ns()}" instead?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes! We fixed it, and the tests are still passing.

@pdames
Copy link
Member

pdames commented Mar 24, 2025

Thanks for putting together the implementation for the fallback janitor! Looking forward to see how/where this janitor is called and integrated with existing failed transaction cleanup logic inside Transaction.commit() functions.

One high-level comment is the placement of the janitor, whether it should be part of the deltacat/storage/model/transaction, since it's a cleaning up job for transaction specifically instead of a general compute job? Curious to learn your opinion here.

I think compute should be the right module for it, since I expect the compute module to hold all DeltaCAT "stored-procedure-like-jobs" that we expect to periodically invoke w/ an external orchestrator (i.e., manually invoked by the end-user, automatically invoked by daily cron jobs for local catalogs, a distributed AWS Lambda invoked by a Step Function workflow, etc.). Over time, I expect the janitor job to also evolve to distribute its work using Ray/Daft/etc. to more efficiently cleanup potentially vast, exabyte-scale, distributed catalogs stored in S3 or other cloud storage services.

@Zyiqin-Miranda
Copy link
Member

I think compute should be the right module for it, since I expect the compute module to hold all DeltaCAT "stored-procedure-like-jobs" that we expect to periodically invoke w/ an external orchestrator (i.e., manually invoked by the end-user, automatically invoked by daily cron jobs for local catalogs, a distributed AWS Lambda invoked by a Step Function workflow, etc.). Over time, I expect the janitor job to also evolve to distribute its work using Ray/Daft/etc. to more efficiently cleanup potentially vast, exabyte-scale, distributed catalogs stored in S3 or other cloud storage services.

Thanks for the context, putting under compute makes sense with janitor invoked as scheduled jobs/distributed jobs.

…tor_handles_empty_directories, and altered implementation of brute force search
@025rhu 025rhu force-pushed the fallback-janitor-job-two branch from 6647d87 to 5ecea33 Compare April 3, 2025 04:44
@@ -410,6 +415,20 @@ def end_time(self) -> Optional[int]:
Returns the end time of the transaction.
"""
return self.get("end_time")

def _mark_status(self, status: str) -> None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to delete the _mark_start_time method or just create a new _mark_status method? The _mark_start_time method is still used by transaction commit, so it seems like the current code will be broken until we add it back.

for operation in self.operations:
total_time_for_transaction += OPERATION_TIMEOUTS.get(operation.type, 0)

start_time = float(self.id.split(TXN_PART_SEPARATOR)[0])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this just be a call to self.start_time()?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, just made this change

Comment on lines 588 to 595
total_time_for_transaction = 0
for operation in self.operations:
total_time_for_transaction += OPERATION_TIMEOUTS.get(operation.type, 0)

start_time = float(self.id.split(TXN_PART_SEPARATOR)[0])
final_time_heartbeat = start_time + (
total_time_for_transaction * NANOS_PER_SEC
) # Convert seconds to nanoseconds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add all logic for converting the heartbeat timeout time to a timeout_time method in TransactionSystemTimeProvider, since this method of calculation is only relevant for transaction time providers that use system clock nanos.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just made this edit.

Comment on lines +107 to +109
SUCCESSFULLY_CLEANED = "cleaned"
CURRENTLY_CLEANING = "cleaning"
TIMEOUT_TXN = "timedout"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move these to a new class TransactionState(str, Enum) type in types.py, and change the supported values to simply:
FAILED (equivalent to CLEANING), PURGED (equivalent to CLEANED), TIMEOUT, RUNNING, and SUCCESS?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Patrick, I added this to types.py

failed_txn_log_dir,
self.id,
)
path_ending = f"{self.id}{TXN_PART_SEPARATOR}{CURRENTLY_CLEANING}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of attaching the "CLEANING" and "CLEANED" states to the transaction file directly (which requires subsequent file movements/renames to apply), what do you think about just writing a transaction.state() helper method that retrieves the current transaction state?

We can change the CLEANING state to FAILED, and infer it as the transaction ID being present in both the failed and running directories, then change CLEANED to PURGED and infer it as the transaction ID ONLY being present in the failed directory. SUCCESS would be inferred as the transaction ID being present in the success directory, and RUNNING would be inferred as the transaction ID being present in the running directory (unless it is also present in the failed directory or, in other words, we should have an if/else check that checks if the directory is in the failed dir first, and otherwise falls back to see if it's running).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Patrick, I’ve gone ahead and implemented it. I created a transaction.state() helper method that determines the current state based on the presence of the transaction ID in the appropriate directories.

Copy link
Member

@pdames pdames left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good start on the problem! Please remove dead/commented-out code and resolve merge conflicts, then I'll get these changes merged!

Comment on lines +10 to +12
# CURRENTLY_CLEANING,
# SUCCESSFULLY_CLEANED,
# TIMEOUT_TXN,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# CURRENTLY_CLEANING,
# SUCCESSFULLY_CLEANED,
# TIMEOUT_TXN,

@@ -15,17 +15,23 @@
import pyarrow.fs

from deltacat.constants import (
# CURRENTLY_CLEANING,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# CURRENTLY_CLEANING,

@@ -15,17 +15,23 @@
import pyarrow.fs

from deltacat.constants import (
# CURRENTLY_CLEANING,
OPERATION_TIMEOUTS,
# SUCCESSFULLY_CLEANED,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# SUCCESSFULLY_CLEANED,

@@ -610,10 +672,14 @@ def _commit_write(
)
except Exception:
# write a failed transaction log file entry
#path_ending = f"{self.id}{TXN_PART_SEPARATOR}{CURRENTLY_CLEANING}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#path_ending = f"{self.id}{TXN_PART_SEPARATOR}{CURRENTLY_CLEANING}"

@@ -639,12 +706,19 @@ def _commit_write(
# delete the in-progress transaction log file entry
filesystem.delete_file(running_txn_log_file_path)
# failed transaction cleanup is now complete
old_path = failed_txn_log_file_path
#new_path = posixpath.join(failed_txn_log_dir, f"{self.id}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#new_path = posixpath.join(failed_txn_log_dir, f"{self.id}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}")


# Run the janitor job that should:
# 1. Move the running txn file to the failed directory with TIMEOUT_TXN appended.
# 2. Invoke brute force search which deletes the metafile and renames the txn log file to use SUCCESSFULLY_CLEANED.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# 2. Invoke brute force search which deletes the metafile and renames the txn log file to use SUCCESSFULLY_CLEANED.
# 2. Invoke brute force search to deletes the metafiles and cleans up txn log files.

Comment on lines +72 to +73
# Expected name: original txn_filename with TIMEOUT_TXN replaced by SUCCESSFULLY_CLEANED.
# new_txn_file_name = f"{txn_filename}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Expected name: original txn_filename with TIMEOUT_TXN replaced by SUCCESSFULLY_CLEANED.
# new_txn_file_name = f"{txn_filename}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}"

# Run the janitor function to move timed-out transactions to the failed directory
janitor_delete_timed_out_transaction(temp_dir)

# Verify that all transactions were moved to the failed directory with SUCCESSFULLY_CLEANED appended
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# Verify that all transactions were moved to the failed directory with SUCCESSFULLY_CLEANED appended
# Verify that all transactions were moved to the failed directory

# Verify that all transactions were moved to the failed directory with SUCCESSFULLY_CLEANED appended
for txn_filename, txn_path, test_metafile_path in txn_filenames:
new_txn_filename = (
# f"{txn_filename}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# f"{txn_filename}{TXN_PART_SEPARATOR}{SUCCESSFULLY_CLEANED}"

current_time = time.time_ns()
if end_time <= current_time:
src_path = running_txn_info.path
#new_filename = f"{filename}{TXN_PART_SEPARATOR}{TIMEOUT_TXN}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#new_filename = f"{filename}{TXN_PART_SEPARATOR}{TIMEOUT_TXN}"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants