Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix TextIO not fully reading a GCS file when decompressive transcoding happens #33384

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

shunping
Copy link
Contributor

@shunping shunping commented Dec 15, 2024

For GCS, we determine the splittability based on whether a file meets decompressive transcoding criteria.

When decompressive transcoding occurs, the size returned from metadata (i.e. the gzipped file size) does not match the size of the content returned (i.e. original data size). In this case, we force the source to be unsplittable to ensure all its content is read.

fixes #31040


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

For GCS, we determine the splittability based on whether the file
meets decompressive transcoding criteria.

When decompressive transcoding occurs, the size returned from
metadata (gzip file size) does not match the size of the content
returned (original data). In this case, we set the source to
unsplittable to ensure all its content is read.
@shunping shunping marked this pull request as draft December 15, 2024 05:14
Copy link

codecov bot commented Dec 15, 2024

Codecov Report

Attention: Patch coverage is 52.63158% with 9 lines in your changes missing coverage. Please review.

Project coverage is 57.39%. Comparing base (d7502fa) to head (40f9a6d).
Report is 27 commits behind head on master.

Files with missing lines Patch % Lines
sdks/python/apache_beam/io/gcp/gcsfilesystem.py 14.28% 6 Missing ⚠️
sdks/python/apache_beam/io/filebasedsource.py 50.00% 2 Missing ⚠️
sdks/python/apache_beam/io/gcp/gcsio.py 50.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff            @@
##             master   #33384   +/-   ##
=========================================
  Coverage     57.38%   57.39%           
  Complexity     1475     1475           
=========================================
  Files           973      973           
  Lines        154978   154997   +19     
  Branches       1076     1076           
=========================================
+ Hits          88939    88956   +17     
- Misses        63829    63831    +2     
  Partials       2210     2210           
Flag Coverage Δ
python 81.27% <52.63%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@@ -259,7 +259,15 @@ def splittable(self):
return self._splittable


def _is_decompressive_transcoding_enabled(file_path):

return True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

?

(am I parsing this right? it seems like a function definition at the top level but with a leading underscore and the body of the function is a stub)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I will have to clean this up when I move the draft to review ready ;)

@@ -945,3 +945,6 @@ def report_lineage(self, path, unused_lineage, level=None):
Unless override by FileSystem implementations, default to no-op.
"""
pass

def check_splittability(self, path):
return True
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably not always be true. If this is a default, perhaps it should not have a default but be abstract and we implement for various filesystems. If it is the default, comment so we understand that is why it ignores the argument.

def check_splittability(self, path):
try:
file_metadata = self._gcsIO()._status(path)
if file_metadata.get('content_encoding', None) == 'gzip':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't the content-type also have to be a particular thing in addition to the content-encoding being set to gzip?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug]: ReadAllFiles does not fully read gzipped files from GCS
2 participants