-
Notifications
You must be signed in to change notification settings - Fork 58
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix limit and add subquerying #1242
base: main
Are you sure you want to change the base?
Conversation
dhruvkaliraman7
commented
Mar 27, 2025
- Fix limit to not count Metadata Documents.
- Add hacky sub querying for a certain set of queries where you want to limit using a field on unique values rather than each row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this does what you want it to do?
return dataset.limit(self._limit) | ||
rayDocs = [] | ||
if self._field: | ||
unique_docs = set() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
stupid naming nit: unique_field_values
if self._field: | ||
field_val = dotted_lookup(deser_doc, self._field) | ||
if field_val is not None and field_val not in unique_docs: | ||
unique_docs.add(field_val) | ||
if len(unique_docs) > self._limit: | ||
break | ||
rayDocs.append(doc) | ||
else: | ||
if not isinstance(deser_doc, MetadataDocument): | ||
count += 1 | ||
if count > self._limit: | ||
break | ||
rayDocs.append(doc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm worried that this logic might not be right.
Case 1: field is set to something, expected behavior is to take 5 documents with different values.
What I get: 7 documents of which 3 have the same value. But in my docset there are hundreds of duplicates of each value. (bc the 8th document gave me my 6th unique field)
Case 2: field is set to something, expected behavior is to take all documents with one of 5 different values.
Same issue but the other way around
Case 3: field is unset, expected behavior is to get 5 documents
What I get: 5 documents, correctly, but a random subset of the metadata documents. Probably I want either all the metadata docs or just the ones pertaining to the documents I have selected.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Case 1: I used the Limit node with field in Finra env after a sort node. Made sure to get all instances of unique value.
Case 3: I thought about this, is the correct behavior to go through all docs and then find matching lineage ids for getting correct metadata docs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the thing to do then is to make this an entirely different docset method or something - call it limit_by_field
- and have it tack on both the sort and the custom uniqueness limit you have here
for doc in all_docs: | ||
if self._field: | ||
field_val = dotted_lookup(doc, self._field) | ||
if field_val is not None and field_val not in unique_docs: | ||
unique_docs.add(field_val) | ||
if len(unique_docs) > self._limit: | ||
break | ||
filtered_docs.append(doc) | ||
else: | ||
if not isinstance(doc, MetadataDocument): | ||
count += 1 | ||
if count > self._limit: | ||
break | ||
filtered_docs.append(doc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be able to factor out the inner logic like so
def limit_iterable(doc_iterable):
for doc in doc_iterable:
...
def execute() -> "Dataset":
doc_iter = map(dataset.iter_rows(), lambda row: Document.deserialize(row['doc']))
limit_iterable(doc_iter)
def local_execute() -> list[Document]:
doc_iter = all_docs
limit_iterable(doc_iter)