diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py index fac7f8bc4bce..328e64155155 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/base_pipeline.py @@ -25,6 +25,8 @@ Expression, Field, Selectable, + FunctionExpression, + _PipelineValueExpression, ) from google.cloud.firestore_v1.types.pipeline import ( StructuredPipeline as StructuredPipeline_pb, @@ -90,6 +92,51 @@ def _to_pb(self, **options) -> StructuredPipeline_pb: options=options, ) + def to_array_expression(self) -> Expression: + """ + Converts this Pipeline into an expression that evaluates to an array of results. + Used for embedding 1:N subqueries into stages like `addFields`. + + Example: + >>> # Get a list of all reviewer names for each book + >>> db.pipeline().collection("books").define(Field.of("id").as_("book_id")).add_fields( + ... db.pipeline().collection("reviews") + ... .where(Field.of("book_id").equal(Variable("book_id"))) + ... .select(Field.of("reviewer").as_("name")) + ... .to_array_expression().as_("reviewers") + ... ) + + Returns: + An :class:`Expression` representing the execution of this pipeline. + """ + return FunctionExpression("array", [_PipelineValueExpression(self)]) + + def to_scalar_expression(self) -> Expression: + """ + Converts this Pipeline into an expression that evaluates to a single scalar result. + Used for 1:1 lookups or Aggregations when the subquery is expected to return a single value or object. + + **Result Unwrapping:** + For simpler access, scalar subqueries producing a single field automatically unwrap that value to the + top level, ignoring the inner alias. If the subquery returns multiple fields, they are preserved as a map. + + Example: + >>> # Calculate average rating for each restaurant using a subquery + >>> db.pipeline().collection("restaurants").define(Field.of("id").as_("rid")).add_fields( + ... db.pipeline().collection("reviews") + ... .where(Field.of("restaurant_id").equal(Variable("rid"))) + ... .aggregate(AggregateFunction.average("rating").as_("value")) + ... .to_scalar_expression().as_("average_rating") + ... ) + + Raises: + RuntimeError: If the result set contains more than one item. If the pipeline has zero results, it evaluates to `null` instead of raising an error. + + Returns: + An :class:`Expression` representing the execution of this pipeline. + """ + return FunctionExpression("scalar", [_PipelineValueExpression(self)]) + def _append(self, new_stage): """ Create a new Pipeline object with a new stage appended @@ -610,3 +657,27 @@ def distinct(self, *fields: str | Selectable) -> "_BasePipeline": A new Pipeline object with this stage appended to the stage list """ return self._append(stages.Distinct(*fields)) + + def define(self, *aliased_expressions: AliasedExpression) -> "_BasePipeline": + """ + Binds one or more expressions to Variables that can be accessed in subsequent stages + or inner subqueries using `Variable`. + + Each Variable is defined using an :class:`AliasedExpression`, which pairs an expression with + a name (alias). + + Example: + >>> db.pipeline().collection("products").define( + ... Field.of("price").multiply(0.9).as_("discountedPrice"), + ... Field.of("stock").add(10).as_("newStock") + ... ).where( + ... Variable("discountedPrice").less_than(100) + ... ).select(Field.of("name"), Variable("newStock")) + + Args: + *aliased_expressions: One or more :class:`AliasedExpression` defining the Variable names and values. + + Returns: + A new Pipeline object with this stage appended to the stage list. + """ + return self._append(stages.Define(*aliased_expressions)) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py index 1833d2ffbac0..7bd5f6a60691 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_expressions.py @@ -31,6 +31,7 @@ from google.cloud.firestore_v1._helpers import GeoPoint, decode_value, encode_value from google.cloud.firestore_v1.types.document import Value +from google.cloud.firestore_v1.types.document import Pipeline as Pipeline_pb from google.cloud.firestore_v1.types.query import StructuredQuery as Query_pb from google.cloud.firestore_v1.vector import Vector @@ -2634,3 +2635,53 @@ class Rand(FunctionExpression): def __init__(self): super().__init__("rand", [], use_infix_repr=False) + + +class Variable(Expression): + """ + Creates an expression that retrieves the value of a variable bound via `Pipeline.define`. + + Example: + >>> # Define a variable "discountedPrice" and use it in a filter + >>> db.pipeline().collection("products").define( + ... Field.of("price").multiply(0.9).as_("discountedPrice") + ... ).where(Variable("discountedPrice").less_than(100)) + + Args: + name: The name of the variable to retrieve. + """ + + def __init__(self, name: str): + self.name = name + + def _to_pb(self) -> Value: + return Value(variable_reference_value=self.name) + + +class _PipelineValueExpression(Expression): + """Internal wrapper to represent a pipeline as an expression.""" + + def __init__(self, pipeline: "google.cloud.firestore_v1.base_pipeline._BasePipeline"): + self.pipeline = pipeline + + def _to_pb(self) -> Value: + pipeline_pb = Pipeline_pb(stages=[s._to_pb() for s in self.pipeline.stages]) + return Value(pipeline_value=pipeline_pb) + + +class CurrentDocument(FunctionExpression): + """ + Creates an expression that represents the current document being processed. + + This acts as a handle, allowing you to bind the entire document to a variable or pass the + document itself to a function or subquery. + + Example: + >>> # Define the current document as a variable "doc" + >>> db.pipeline().collection("books").define( + ... CurrentDocument().as_("doc") + ... ).select(Variable("doc").get_field("title")) + """ + + def __init__(self): + super().__init__("current_document", []) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py index 7075797b3d57..5b16f7b67926 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_source.py @@ -171,3 +171,28 @@ def literals( A new Pipeline object with this stage appended to the stage list. """ return self._create_pipeline(stages.Literals(*documents)) + + def subcollection(self, path: str) -> PipelineType: + """ + Initializes a pipeline scoped to a subcollection. + + This method allows you to start a new pipeline that operates on a subcollection of the + current document. It is intended to be used as a subquery. + + **Note:** A pipeline created with `subcollection` cannot be executed directly using + `execute()`. It must be used within a parent pipeline. + + Example: + >>> db.pipeline().collection("books").add_fields( + ... db.pipeline().subcollection("reviews") + ... .aggregate(AggregateFunction.average("rating").as_("avg_rating")) + ... .to_scalar_expression().as_("average_rating") + ... ) + + Args: + path: The path of the subcollection. + + Returns: + A new :class:`Pipeline` instance scoped to the subcollection. + """ + return self._create_pipeline(stages.Subcollection(path)) diff --git a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py index cac9c70d4b99..7a2d8d3b08af 100644 --- a/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py +++ b/packages/google-cloud-firestore/google/cloud/firestore_v1/pipeline_stages.py @@ -494,3 +494,25 @@ def __init__(self, condition: BooleanExpression): def _pb_args(self): return [self.condition._to_pb()] + + +class Define(Stage): + """Binds one or more expressions to variables.""" + + def __init__(self, *expressions: AliasedExpression): + super().__init__("let") + self.expressions = list(expressions) + + def _pb_args(self) -> list[Value]: + return [Selectable._to_value(self.expressions)] + + +class Subcollection(Stage): + """Targets a subcollection relative to the current document.""" + + def __init__(self, path: str): + super().__init__("subcollection") + self.path = path + + def _pb_args(self) -> list[Value]: + return [encode_value(self.path)] diff --git a/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml new file mode 100644 index 000000000000..7eed48ea8e73 --- /dev/null +++ b/packages/google-cloud-firestore/tests/system/pipeline_e2e/subqueries.yaml @@ -0,0 +1,180 @@ +tests: + - description: array_subquery_with_variable + pipeline: + - Collection: books + - Where: + - FunctionExpression.equal: + - Field: title + - Constant: "1984" + - Define: + - AliasedExpression: + - Field: genre + - target_genre + - AddFields: + - AliasedExpression: + - Pipeline.to_array_expression: + - Collection: books + - Where: + - FunctionExpression.equal: + - Field: genre + - Variable: target_genre + - Select: + - Field: title + - same_genre_books + - Select: + - Field: title + - Field: same_genre_books + assert_results: + - title: "1984" + same_genre_books: + - "1984" + - "The Handmaid's Tale" + assert_proto: + pipeline: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: title + - stringValue: '1984' + name: equal + name: where + - args: + - mapValue: + fields: + target_genre: + fieldReferenceValue: genre + name: let + - args: + - mapValue: + fields: + same_genre_books: + functionValue: + args: + - pipelineValue: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: genre + - variableReferenceValue: target_genre + name: equal + name: where + - args: + - mapValue: + fields: + title: + fieldReferenceValue: title + name: select + name: array + name: add_fields + - args: + - mapValue: + fields: + same_genre_books: + fieldReferenceValue: same_genre_books + title: + fieldReferenceValue: title + name: select + + - description: scalar_subquery_with_current_document + pipeline: + - Collection: books + - Where: + - FunctionExpression.equal: + - Field: title + - Constant: "1984" + - Define: + - AliasedExpression: + - CurrentDocument: + - doc + - AddFields: + - AliasedExpression: + - Pipeline.to_scalar_expression: + - Collection: books + - Where: + - FunctionExpression.equal: + - Field: genre + - FunctionExpression.map_get: + - Variable: doc + - Constant: "genre" + - Aggregate: + - AliasedExpression: + - FunctionExpression.average: + - Field: rating + - avg_rating + - average_rating + - Select: + - Field: title + - Field: average_rating + assert_results: + - title: "1984" + average_rating: 4.15 + assert_proto: + pipeline: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: title + - stringValue: '1984' + name: equal + name: where + - args: + - mapValue: + fields: + doc: + functionValue: + name: current_document + name: let + - args: + - mapValue: + fields: + average_rating: + functionValue: + args: + - pipelineValue: + stages: + - args: + - referenceValue: /books + name: collection + - args: + - functionValue: + args: + - fieldReferenceValue: genre + - functionValue: + args: + - variableReferenceValue: doc + - stringValue: genre + name: map_get + name: equal + name: where + - args: + - mapValue: + fields: + avg_rating: + functionValue: + args: + - fieldReferenceValue: rating + name: average + - mapValue: {} + name: aggregate + name: scalar + name: add_fields + - args: + - mapValue: + fields: + average_rating: + fieldReferenceValue: average_rating + title: + fieldReferenceValue: title + name: select diff --git a/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py b/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py index f1fd1326c765..81bad2e8c3cf 100644 --- a/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py +++ b/packages/google-cloud-firestore/tests/system/test_pipeline_acceptance.py @@ -258,6 +258,18 @@ def _parse_expressions(client, yaml_element: Any): # find Pipeline objects for Union expressions other_ppl = yaml_element["Pipeline"] return parse_pipeline(client, other_ppl) + elif ( + len(yaml_element) == 1 + and list(yaml_element)[0] == "Pipeline.to_array_expression" + ): + other_ppl = yaml_element["Pipeline.to_array_expression"] + return parse_pipeline(client, other_ppl).to_array_expression() + elif ( + len(yaml_element) == 1 + and list(yaml_element)[0] == "Pipeline.to_scalar_expression" + ): + other_ppl = yaml_element["Pipeline.to_scalar_expression"] + return parse_pipeline(client, other_ppl).to_scalar_expression() else: # otherwise, return dict return { @@ -286,6 +298,8 @@ def _apply_yaml_args_to_callable(callable_obj, client, yaml_args): ): # yaml has an array of arguments. Treat as args return callable_obj(*_parse_expressions(client, yaml_args)) + elif yaml_args is None: + return callable_obj() else: # yaml has a single argument return callable_obj(_parse_expressions(client, yaml_args))