Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mongodb: Allow projection and mongoarrow schema #592

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

zilto
Copy link
Collaborator

@zilto zilto commented Jan 12, 2025

Tell us what you do here

Short description

This implements two sets of features:

  • Improve default behavior for objectId type and add pymongoarrow_schema for a user-facing type-casting API for mongodb -> pyarrow
  • Add projection kwarg: allows to select the fields to include/exclude when pulling the data from MongoDB. This reduces egress volume and helps with regulatory compliance (e.g., don't pull sensitive information)

Notes

  • Using pymongoarrow_schema can intersect with the column selection in projection. The pymongoarrow_schema overrides the projection kwarg.
  • The pymongoarrow_schema will "fail silently" if you cast to an invalid type (e.g., ObjectId to int): Avoid Type Mismatch Conversion to NaN mongodb-labs/mongo-arrow#246
  • ObjectId is represented as a binary[12] type in Arrow using pymongoarrow. There's no vectorized operation to convert this to a string since the buffer contains ASCII characters.
  • Invalid types in pymongoarrow_schema will make pipeline.run() fail with an hard to trace error Out of buffer coming from arrow (e.g., _id: pyarrow.string().
    E           dlt.pipeline.exceptions.PipelineStepFailed: Pipeline execution failed at stage load when processing package 1736799421.5290868 with exception:
    E           
    E           <class 'dlt.load.exceptions.LoadClientJobRetry'>
    E           Job for movies.e27d2e9dab.parquet had 5 retries which a multiple of 5. Exiting retry loop. You can still rerun the load package to retry this job. Last failure message was Out of buffer
  • pymongoarrow.schema.Schema validates that its type annotations are valid a __init__()

Future work

  • As pymongoarrow is updated, we can raise failed type casting.
  • Vectorize the operation to cast ObjectId to string.
  • If we can't vectorize type casting from ObjectId to string, then we can't efficiently handle nested arrays and structs without looping through each

Open questions

  1. projection was added to mongodb_collection resource. Should it be added to mongodb source? Then, the projection would be applied to all the generated resources.
  2. projection + incremental loading: should we raise an exception if the primary_key is excluded? or force its inclusion and log a warning (current implementation)?.
  3. pymongoarrow_schema is applied at extract-time and it's hard to surface schema issues to the user. How much validation needs to happen within the dlt source code vs. user code?
  4. Should we upgrade from pymongoarrow == 1.4.0 to == 1.6.0 given their better type support?

Related Issues

@zilto zilto requested a review from rudolfix January 13, 2025 21:42
Copy link
Contributor

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks really good! and thanks for writing tests.
your questions:
1 - 2: answered in the review

pymongoarrow_schema is not yet committed so I'll guess the answer to 3:

  • I think you can reuse validation for projection to also validate columns in the mongo schema.
  • this is really power user feature so we do not need to do a deep validation
  • it is OK to fail in extract phase but we always try to provide useful exception messages for the most common case. If you write test properly you'll have those cases there ie. schema has some crazy type mismatch or refers to unknow columns - just wrap those in 1-2 meaningful exception (Out of buffer is not really helpful)

Yeah please upgrade the pymongoarrow - AFAIK the newest arrow is handling UUIDs as a type so maybe we do not need to do this crazy duckdb conversion from the other ticket

@@ -90,6 +91,7 @@ def mongodb_collection(
chunk_size: Optional[int] = 10000,
data_item_format: Optional[TDataItemFormat] = "object",
filter_: Optional[Dict[str, Any]] = None,
projection: Optional[Union[Mapping[str, Any], Iterable[str]]] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. You should add projection to MongoDbCollectionResourceConfiguration so it becomes part of the configuration. for this particular resource we have explicit config (not derived from signature). This also allows user to configure projection per collection: even if they use mongodb source.
  2. We are using legacy way to create resources with dynamic names. Could you try
@dlt.resource(
    name=lambda args: args["collection"], standalone=True, spec=MongoDbCollectionResourceConfiguration
)
def ...
  1. What is the reason for not mixing exclude and include projections? is it a limitation of pymongo? If not then IMO it makes sense replace projection with include_projection and exclude_projection, both lists and let user to specify both....

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. What is the reason for not mixing exclude and include projections?

The projection argument is implemented the same as pymongo (so users can copy-paste from their existing code). AFAIK, the query engine won't let you specify "include" and "exclude" clauses at once. It's either / or, so allowing two kwargs include_projection and exclude_projection would make it ambiguous which is used


projection_dict = dict(_fields_list_to_dict(projection, "projection"))

# NOTE we can still filter on primary_key if it's excluded from projection
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mongo has "_id" column which is a kind of primary key. Currently when we create a resource for a collection, we set the primary_key to "_id". if that field is removed, dlt will later complain that primary key is not assuming any value so I'd add it back with a warning (like you do below). that we can do way earlier (when we create a resource)

In very rare cases when someone sets the primary key to something else (on resource / incremental) your code blow is fine. Adding back primary key and warning is good.

cool you thought about this edge case!

and in 99% of cases this is what you'll find in the incremental. Now the question is if

@zilto zilto marked this pull request as ready for review January 28, 2025 01:57
@zilto zilto force-pushed the fix/577-pymongoarrow-schema branch from 2742c83 to 33c8063 Compare January 29, 2025 21:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for pymongoarrow schema and projection parameters in the mongo CollectionLoaders
2 participants