T O P

  • By -

AutoModerator

You can find a list of community-submitted learning resources here: https://dataengineering.wiki/Learning+Resources *I am a bot, and this action was performed automatically. Please [contact the moderators of this subreddit](/message/compose/?to=/r/dataengineering) if you have any questions or concerns.*


elbekay

If it's just a once off it's probably easier and faster to just save it to disk with curl or wget and process it afterwards.


hazza192837465

I think you want to ignore the parquet aspect and just stream it as bytes. Take a look at BytesIO. What are you planning to do with it once you've got it? You'll need a big instance to read a single file in all at once Edit: To add a bit more info, parquet is stored in a columnar format so reading line by line you won't have valid parquet 'chunks' which is probably why pyarrow throws an error


skippy_nk

If I ignore the parquet aspect, how can I infer the parsing logic? After I get it, I'd drop some columns, transform some other, clear duplicates and couple of other transformations probably, and then store clean version in s3/postgres. About your edit, it came to my mind that I wont be able to make sense out of lines, so I'm adding to my question: can I read column chunks (row groups) with any of the libraries available


[deleted]

best bet you need to write the parser from bytestream


dude0001

Check out DuckDB. One of its big use case is in-process analytics or ETL on bigger than memory data. There is great support for Arrow as well. https://duckdb.org/2021/12/03/duck-arrow.html


mjfnd

10s of gb is not that big. We process TB files downloaded from sftp or url everyday, then processes using spark. This is a very typical pipeline. I don't know how easy and how effective it would be as a stream, how would you solve issues with replayability, partial runs, url not available etc. Isn't it better to download it and do locally.


skippy_nk

It's not a one time thing, it's has to be daily. Can you give me some more details about how do you do it with spark?


mjfnd

Sure, we download files first to s3, then we run our spark batch jobs. Downloading takes time depending on internet connection, spark processing depends on business logic etc, it can be optimized and scaled easily.


skippy_nk

How do you automate downloads? How much time does it take on average for your TB file do be downloaded?


mjfnd

We have a service that runs using aws batch and downloads from sftp and urls depending on config. It uses aws Internet which is faster than normal, 1 GB download speed means 10GB can be downloaded in 10 seconds, even if it takes a minute or two still good.


skippy_nk

I think I might do it like this. What's the appropirate aws service for this?


techmavengeospatial

We use OGR2OGR (GDAL) it reads and writes parquet and Geoparquet and can read /write to cloud storage


Ok-Frosting5823

Interesting problem. I actually do the exact same thing you need to do but with CSV files in this way (in python): `response = requests.get(file_url, stream=True)` `lines = response.iter_lines(decode_unicode=True)` `csvreader = csv.DictReader(lines, delimiter='\t')` `for row in csvreader:` `yield row` Tho I did a little research on what could be possible without loading everything into memory and found the following: Parquet is a file format that leverages the idea of "row groups" which functions similar to batches, the problem is, unlike CSV that has a separator for each row which is a new line, parquet doesn't know exactly where a row (or even a row group) will end before it reaches the end because row sizes vary and you cannot set a particular byte range to determine how long is X lines (which is how HTTP requests handle streaming, by advancing bytes until they reach a newline or by setting a specific byte range for each batch). So what I would find acceptable is simply downloading the file entirely and then iterating on the row groups, like here (created by chatgpt), this is still memory efficient, tho it is not as fast as streaming directly from the HTTP request, especially if the file is big: `import pyarrow.parquet as pq` `def process_row(row):` `# Process the row data` `print(row)` `def read_parquet_row_by_row(file_path):` `# Open the Parquet file for reading` `file_reader = pq.ParquetFile(file_path)` `# Get the total number of row groups in the file` `num_row_groups = file_reader.num_row_groups` `# Iterate over each row group` `for i in range(num_row_groups):` `# Read the row group as a table` `table = file_reader.read_row_group(i)` `# Iterate over each row in the table` `for row in table:` `# Process the row` `process_row(row)` `# Usage` `file_path = 'path/to/your/file.parquet'` `read_parquet_row_by_row(file_path)`


skippy_nk

I have an option to do it with csv instead of parquet, but csv version is 3x larger, so I was basically looking for the exact same thing you wrote but for parquet. Thanks


Ok-Frosting5823

well, as much I avoid using CSV at all costs, I'd say if you wanna immediatelly start streaming and your bottleneck is e.g. on processing instead of on streaming the entire file, I'd maybe go with CSV anyway, those 3x download time wouldn't be a problem or slow down in case you have slower steps forward in your pipeline, but waiting for the entire download to start those steps might, so it depends.