From dbfa3cbd0dfa08e031704867762b5e8f77aeddeb Mon Sep 17 00:00:00 2001 From: Mengjing Xu Date: Mon, 5 May 2025 17:25:23 -0700 Subject: [PATCH 1/5] Fix Fix issues with tensorflow tf generator support for Object Storage --- dlio_benchmark/data_generator/tf_generator.py | 4 +-- dlio_benchmark/framework/framework.py | 3 ++ dlio_benchmark/framework/tf_framework.py | 30 +++++++++++++++++-- dlio_benchmark/storage/file_storage.py | 4 +++ dlio_benchmark/storage/s3_storage.py | 4 +++ dlio_benchmark/storage/storage_handler.py | 7 +++++ 6 files changed, 48 insertions(+), 4 deletions(-) diff --git a/dlio_benchmark/data_generator/tf_generator.py b/dlio_benchmark/data_generator/tf_generator.py index b97c71ae..d4db97be 100644 --- a/dlio_benchmark/data_generator/tf_generator.py +++ b/dlio_benchmark/data_generator/tf_generator.py @@ -77,6 +77,6 @@ def generate(self): filename = os.path.basename(out_path_spec) self.storage.create_node(index_folder, exist_ok=True) tfrecord_idx = f"{index_folder}/{filename}.idx" - if not os.path.isfile(tfrecord_idx): - call([tfrecord2idx_script, out_path_spec, tfrecord_idx]) + if not self.storage.isfile(tfrecord_idx): + call([tfrecord2idx_script, out_path_spec, self.storage.get_uri(tfrecord_idx)]) np.random.seed() diff --git a/dlio_benchmark/framework/framework.py b/dlio_benchmark/framework/framework.py index f37a62b9..80a5729c 100644 --- a/dlio_benchmark/framework/framework.py +++ b/dlio_benchmark/framework/framework.py @@ -105,3 +105,6 @@ def put_data(self, id, data, offset=None, length=None): def get_data(self, id, data, offset=None, length=None): return None + def isfile(self, id): + return False + diff --git a/dlio_benchmark/framework/tf_framework.py b/dlio_benchmark/framework/tf_framework.py index 175ddbb9..99157d6a 100644 --- a/dlio_benchmark/framework/tf_framework.py +++ b/dlio_benchmark/framework/tf_framework.py @@ -31,7 +31,9 @@ DataLoaderType import tensorflow as tf +import tensorflow_io as tfio from tensorflow.python.framework import errors +import boto3 tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) @@ -52,6 +54,9 @@ def __init__(self, profiling): else: self.tensorboard = ProfilerFactory.get_profiler(Profiler.TENSORBOARD) self.reader_handler = None + self.s3 = boto3.client("s3", \ + endpoint_url=os.getenv('S3_ENDPOINT'), \ + region_name=os.getenv('AWS_REGION')) @dlp.log def init_loader(self, format_type, epoch=0, data_loader=None): @@ -102,7 +107,7 @@ def is_nativeio_available(self): @dlp.log def create_node(self, id, exist_ok=False): - tf.io.gfile.mkdir(id) + tf.io.gfile.makedirs(id) return True @dlp.log @@ -119,7 +124,24 @@ def get_node(self, id): def walk_node(self, id, use_pattern=False): try: if not use_pattern: - return tf.io.gfile.listdir(id) + # parse id to get bucket name and prefix + scheme_end = id.find('://') + 2 + bucket_end = id.find('/', scheme_end + 1) + bucket = id[scheme_end + 1 : bucket_end] + prefix = id[bucket_end + 1 :] + if not prefix.endswith('/'): + prefix = prefix + '/' + + resp = [] + paginator = self.s3.get_paginator('list_objects_v2') + pages = paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter='/') + for page in pages: + if page['KeyCount'] == 0: + continue + for obj in page['Contents']: + filename = obj['Key'].split('/')[-1] + resp.append(filename) + return resp else: return tf.io.gfile.glob(id) except errors.NotFoundError: @@ -140,3 +162,7 @@ def get_data(self, id, data, offset=None, length=None): with tf.io.gfile.GFile(id, "r") as fd: data = fd.read() return data + + @dlp.log + def isfile(self, id): + return tf.io.gfile.exists(id) and not tf.io.gfile.isdir(id) diff --git a/dlio_benchmark/storage/file_storage.py b/dlio_benchmark/storage/file_storage.py index 8b2aa6a6..19208975 100644 --- a/dlio_benchmark/storage/file_storage.py +++ b/dlio_benchmark/storage/file_storage.py @@ -99,5 +99,9 @@ def get_data(self, id, data, offset=None, length=None): data = fd.read() return data + @dlp.log + def isfile(self, id): + return os.path.isfile(id) + def get_basename(self, id): return os.path.basename(id) diff --git a/dlio_benchmark/storage/s3_storage.py b/dlio_benchmark/storage/s3_storage.py index f28c2eaa..1e76bd52 100644 --- a/dlio_benchmark/storage/s3_storage.py +++ b/dlio_benchmark/storage/s3_storage.py @@ -72,5 +72,9 @@ def put_data(self, id, data, offset=None, length=None): def get_data(self, id, data, offset=None, length=None): return super().get_data(self.get_uri(id), data, offset, length) + @dlp.log + def isfile(self, id): + return super().isfile(self.get_uri(id)) + def get_basename(self, id): return os.path.basename(id) \ No newline at end of file diff --git a/dlio_benchmark/storage/storage_handler.py b/dlio_benchmark/storage/storage_handler.py index 44da1db8..3dd084fa 100644 --- a/dlio_benchmark/storage/storage_handler.py +++ b/dlio_benchmark/storage/storage_handler.py @@ -123,3 +123,10 @@ def get_data(self, id, data, offset=None, length=None): return self.framework.get_data(id, data, offset, length) return None + def isfile(self, id): + """ + This method checks if the given path is a file + """ + if self.is_framework_nativeio_available: + return self.framework.isfile(id) + return None From e27471ae18ac149d93e8dfdce39f35829c7ea6bd Mon Sep 17 00:00:00 2001 From: Mengjing Xu Date: Wed, 7 May 2025 18:27:12 -0700 Subject: [PATCH 2/5] add tensorflow_io and boto3 to requirements.txt --- requirements.txt | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 11c7539c..8eaf3dd6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,7 +12,9 @@ pandas>=1.5.1 psutil>=5.9.8 pydftracer==1.0.11 pytest -tensorflow>=2.11.0 +tensorflow==2.13.1 +tensorflow_io==0.33.0 torch>=2.2.0 torchaudio torchvision +boto3 From 1e2d4e5dd13d2418c466f9552c17fdf92e0618c2 Mon Sep 17 00:00:00 2001 From: Mengjing Xu Date: Wed, 7 May 2025 18:33:18 -0700 Subject: [PATCH 3/5] update packages in setup.py --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 7df0cff7..69596004 100644 --- a/setup.py +++ b/setup.py @@ -17,11 +17,13 @@ "omegaconf>=2.2.0", "pandas>=1.5.1", "psutil>=5.9.8", + "boto3", ] x86_deps = [ f"hydra-core>={HYDRA_VERSION}", "nvidia-dali-cuda120>=1.34.0", - "tensorflow>=2.11.0", + "tensorflow==2.13.1", + "tensorflow_io==0.33.0", "torch>=2.2.0", "torchaudio", "torchvision", From 70f04560b179bea86ad93236b1030afb81741c2f Mon Sep 17 00:00:00 2001 From: Mengjing Xu Date: Mon, 12 May 2025 22:17:46 -0700 Subject: [PATCH 4/5] relax version constraints --- requirements.txt | 4 ++-- setup.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/requirements.txt b/requirements.txt index 8eaf3dd6..6be14162 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,8 +12,8 @@ pandas>=1.5.1 psutil>=5.9.8 pydftracer==1.0.11 pytest -tensorflow==2.13.1 -tensorflow_io==0.33.0 +tensorflow>=2.13.1 +tensorflow_io>=0.33.0 torch>=2.2.0 torchaudio torchvision diff --git a/setup.py b/setup.py index 69596004..1e9c71b7 100644 --- a/setup.py +++ b/setup.py @@ -22,8 +22,8 @@ x86_deps = [ f"hydra-core>={HYDRA_VERSION}", "nvidia-dali-cuda120>=1.34.0", - "tensorflow==2.13.1", - "tensorflow_io==0.33.0", + "tensorflow>=2.13.1", + "tensorflow_io>=0.33.0", "torch>=2.2.0", "torchaudio", "torchvision", From 4f5b406b9da143ec1180a9f941237ba9739a368d Mon Sep 17 00:00:00 2001 From: Mengjing Xu Date: Sun, 18 May 2025 23:00:22 -0700 Subject: [PATCH 5/5] revert changes for walk_node --- dlio_benchmark/framework/tf_framework.py | 23 +---------------------- requirements.txt | 1 - setup.py | 1 - 3 files changed, 1 insertion(+), 24 deletions(-) diff --git a/dlio_benchmark/framework/tf_framework.py b/dlio_benchmark/framework/tf_framework.py index 99157d6a..6566ab39 100644 --- a/dlio_benchmark/framework/tf_framework.py +++ b/dlio_benchmark/framework/tf_framework.py @@ -33,7 +33,6 @@ import tensorflow as tf import tensorflow_io as tfio from tensorflow.python.framework import errors -import boto3 tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR) @@ -54,9 +53,6 @@ def __init__(self, profiling): else: self.tensorboard = ProfilerFactory.get_profiler(Profiler.TENSORBOARD) self.reader_handler = None - self.s3 = boto3.client("s3", \ - endpoint_url=os.getenv('S3_ENDPOINT'), \ - region_name=os.getenv('AWS_REGION')) @dlp.log def init_loader(self, format_type, epoch=0, data_loader=None): @@ -124,24 +120,7 @@ def get_node(self, id): def walk_node(self, id, use_pattern=False): try: if not use_pattern: - # parse id to get bucket name and prefix - scheme_end = id.find('://') + 2 - bucket_end = id.find('/', scheme_end + 1) - bucket = id[scheme_end + 1 : bucket_end] - prefix = id[bucket_end + 1 :] - if not prefix.endswith('/'): - prefix = prefix + '/' - - resp = [] - paginator = self.s3.get_paginator('list_objects_v2') - pages = paginator.paginate(Bucket=bucket, Prefix=prefix, Delimiter='/') - for page in pages: - if page['KeyCount'] == 0: - continue - for obj in page['Contents']: - filename = obj['Key'].split('/')[-1] - resp.append(filename) - return resp + return tf.io.gfile.listdir(id) else: return tf.io.gfile.glob(id) except errors.NotFoundError: diff --git a/requirements.txt b/requirements.txt index 6be14162..8eb1a5d3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,4 +17,3 @@ tensorflow_io>=0.33.0 torch>=2.2.0 torchaudio torchvision -boto3 diff --git a/setup.py b/setup.py index 1e9c71b7..3b694eff 100644 --- a/setup.py +++ b/setup.py @@ -17,7 +17,6 @@ "omegaconf>=2.2.0", "pandas>=1.5.1", "psutil>=5.9.8", - "boto3", ] x86_deps = [ f"hydra-core>={HYDRA_VERSION}",