menu

Questions & Answers

List only the subfolder names using spark and python adls gen 2

I have a folder structure where I have a source, the year, the month, the day and then a parquet file, here I store data every day in a new folder.

Source

  • 2022
    • 12
      • 30
      • 31
  • 2023
    • 01
      • 01
      • 02
      • 03

Etc.

I need to dynamically be able to select the latest folder. In this scenario, it's folder 2023/01/03, but I can't seem to get it out.

I've tried importing os and used the following code:

pq_date_folders = f'{abfss_path}/{var_table}/.'  

for root, dirs, files in os.walk(pq_date_folders, topdown=False):
    for name in dirs: 
        print(os.path.join(root, name))

But nothing get's printed. What am I doing wrong?

Data stored in adls gen 2, queried through databricks using python.

Comments:
2023-01-11 09:10:42
I have an answer on how to do it in scala, I tested on adls gen2 and it works fine, maybe you can try to implement something similiar in python: stackoverflow.com/questions/74179346/…
2023-01-11 09:10:42
Did you try using an actual Python library for ADLS? github.com/Azure/azure-data-lake-store-python
Answers(1) :

The problem is that you use os library to do that, the databricks cluster and the datalake are in diffrent machines/networks, databricks use credentials to connect to the datalake to get the data, you need to pass these credentails to any operations you want to do on these data, fortunately these credentails exists in your spark session, so you can use hadoop with the spark session configuration to query the data in your datalake:

I implement a function that get the max path under a directory, when we get the max path w check it's subdirectories and we get the max path again and so on ( was tested on Azure databricks with a datalake adls gen2 ):

# First make sure to install hdfs library:
!pip install hdfs

Then:

# Function to get the max directory under a path:
def getLastPath(path, fs):
  pathsList = list(map(lambda x: str(x.getPath()),fs.listStatus(Path(path))))
  return sorted(pathsList)[-1]

Then use it like this to the root path that contains the folder 2022, 2023 ...:

path = "dbfs:/mnt/xxx-dls/root_path/"
Path = spark.sparkContext._gateway.jvm.org.apache.hadoop.fs.Path
fs = Path(path).getFileSystem(sc._jsc.hadoopConfiguration())
while fs.isDirectory(Path(getLastPath(path, fs))):
  path = getLastPath(path, fs)
print(path)

Another option if you only using databricks is to use dbutilis.fs.ls("/path/..") and get the max folder in each directory.

Comments:
2023-01-11 09:10:42
When I try out this i get the following error: py4j.security.Py4JSecurityException: Constructor public org.apache.hadoop.fs.Path(java.lang.String) throws java.lang.IllegalArgumentException is not whitelisted.
2023-01-11 09:10:42
@CMJ It seems like a credentails configuration issue between databricks and the datalake, I'm not sure why, all I can find is this: stackoverflow.com/questions/55427770/…
2023-01-11 09:10:42
@CMJ try to post this issue on a separate thread maybe someone has an answer.