@@ -590,49 +590,60 @@ def _make_unwind_project_stage(only: list):
590590        }
591591
592592    @classmethod  
593-     def  _stat_with_unwind (
593+     def  _stat_with_pipeline (
594594        cls ,
595-         unwind : list ,
595+         lookup : list  =  None ,
596+         unwind : dict  =  None ,
597+         add_fields : dict  =  None ,
596598        only : list  =  None ,
597599        filter : list  =  None ,
598600        filter_or : list  =  None ,
599601        sort : list  =  None ,
600602        page : dict  =  None ,
601603        target : str  =  None ,
602604    ):
603-         if  only  is  None :
604-             raise  ERROR_DB_QUERY (reason = "unwind option requires only option." )
605+         if  unwind :
606+             if  only  is  None :
607+                 raise  ERROR_DB_QUERY (reason = "unwind option requires only option." )
605608
606-         if  not  isinstance (unwind , dict ):
607-             raise  ERROR_DB_QUERY (reason = "unwind option should be dict type." )
609+              if  not  isinstance (unwind , dict ):
610+                  raise  ERROR_DB_QUERY (reason = "unwind option should be dict type." )
608611
609-         if  "path"  not  in   unwind :
610-             raise  ERROR_DB_QUERY (reason = "unwind option should have path key." )
612+              if  "path"  not  in   unwind :
613+                  raise  ERROR_DB_QUERY (reason = "unwind option should have path key." )
611614
612-         unwind_path  =  unwind ["path" ]
613-         aggregate  =  [{"unwind" : unwind }]
615+         aggregate  =  []
614616
615-         # Add project stage 
616-         project_fields  =  []
617-         for  key  in  only :
618-             project_fields .append (
617+         if  lookup :
618+             for  lu  in  lookup :
619+                 aggregate .append ({"lookup" : lu })
620+ 
621+         if  unwind :
622+             aggregate .append ({"unwind" : unwind })
623+ 
624+         if  add_fields :
625+             aggregate .append ({"add_fields" : add_fields })
626+ 
627+         if  only :
628+             project_fields  =  []
629+             for  key  in  only :
630+                 project_fields .append (
631+                     {
632+                         "key" : key ,
633+                         "name" : key ,
634+                     }
635+                 )
636+ 
637+             aggregate .append (
619638                {
620-                     "key" : key ,
621-                     "name" : key ,
639+                     "project" : {
640+                         "exclude_keys" : True ,
641+                         "only_keys" : True ,
642+                         "fields" : project_fields ,
643+                     }
622644                }
623645            )
624646
625-         aggregate .append (
626-             {
627-                 "project" : {
628-                     "exclude_keys" : True ,
629-                     "only_keys" : True ,
630-                     "fields" : project_fields ,
631-                 }
632-             }
633-         )
634- 
635-         # Add sort stage 
636647        if  sort :
637648            aggregate .append ({"sort" : sort })
638649
@@ -641,21 +652,23 @@ def _stat_with_unwind(
641652            filter = filter ,
642653            filter_or = filter_or ,
643654            page = page ,
644-             tageet = target ,
655+             target = target ,
645656            allow_disk_use = True ,
646657        )
647658
648659        try :
649660            vos  =  []
650661            total_count  =  response .get ("total_count" , 0 )
651662            for  result  in  response .get ("results" , []):
652-                 unwind_data  =  utils .get_dict_value (result , unwind_path )
653-                 result  =  utils .change_dict_value (result , unwind_path , [unwind_data ])
663+                 if  unwind :
664+                     unwind_path  =  unwind ["path" ]
665+                     unwind_data  =  utils .get_dict_value (result , unwind_path )
666+                     result  =  utils .change_dict_value (result , unwind_path , [unwind_data ])
654667
655668                vo  =  cls (** result )
656669                vos .append (vo )
657670        except  Exception  as  e :
658-             raise  ERROR_DB_QUERY (reason = f"Failed to convert unwind  result: { e }  " )
671+             raise  ERROR_DB_QUERY (reason = f"Failed to convert pipeline  result: { e }  " )
659672
660673        return  vos , total_count 
661674
@@ -672,7 +685,9 @@ def query(
672685        minimal = False ,
673686        include_count = True ,
674687        count_only = False ,
688+         lookup = None ,
675689        unwind = None ,
690+         add_fields = None ,
676691        reference_filter = None ,
677692        target = None ,
678693        hint = None ,
@@ -683,9 +698,17 @@ def query(
683698        sort  =  sort  or  []
684699        page  =  page  or  {}
685700
686-         if  unwind :
687-             return  cls ._stat_with_unwind (
688-                 unwind , only , filter , filter_or , sort , page , target 
701+         if  unwind  or  lookup  or  add_fields :
702+             return  cls ._stat_with_pipeline (
703+                 lookup = lookup ,
704+                 unwind = unwind ,
705+                 add_fields = add_fields ,
706+                 only = only ,
707+                 filter = filter ,
708+                 filter_or = filter_or ,
709+                 sort = sort ,
710+                 page = page ,
711+                 target = target ,
689712            )
690713
691714        else :
@@ -1599,7 +1622,8 @@ def analyze(
15991622        aggregate  =  []
16001623
16011624        if  lookup :
1602-             aggregate .append ({"lookup" : lookup })
1625+             for  lu  in  lookup :
1626+                 aggregate .append ({"lookup" : lu })
16031627
16041628        if  unwind :
16051629            aggregate .append ({"unwind" : unwind })
0 commit comments