@@ -453,18 +453,22 @@ julia> combine(gd, :, AsTable(Not(:a)) => sum, renamecols=false)
453453``` 
454454""" 
455455function  combine (f:: Base.Callable , gd:: GroupedDataFrame ;
456-                  keepkeys:: Bool = true , ungroup:: Bool = true , renamecols:: Bool = true )
456+                  keepkeys:: Bool = true , ungroup:: Bool = true , renamecols:: Bool = true ,
457+                  nthreads:: Int = NTHREADS)
457458    return  combine_helper (f, gd, keepkeys= keepkeys, ungroup= ungroup,
458-                           copycols= true , keeprows= false , renamecols= renamecols)
459+                           copycols= true , keeprows= false , renamecols= renamecols,
460+                           nthreads= nthreads)
459461end 
460462
461463combine (f:: typeof (nrow), gd:: GroupedDataFrame ;
462-         keepkeys:: Bool = true , ungroup:: Bool = true , renamecols:: Bool = true ) = 
464+         keepkeys:: Bool = true , ungroup:: Bool = true , renamecols:: Bool = true ,
465+         nthreads:: Int = NTHREADS) = 
463466    combine (gd, [nrow =>  :nrow ], keepkeys= keepkeys, ungroup= ungroup,
464467            renamecols= renamecols)
465468
466469function  combine (p:: Pair , gd:: GroupedDataFrame ;
467-                  keepkeys:: Bool = true , ungroup:: Bool = true , renamecols:: Bool = true )
470+                  keepkeys:: Bool = true , ungroup:: Bool = true , renamecols:: Bool = true ,
471+                  nthreads:: Int = NTHREADS)
468472    #  move handling of aggregate to specialized combine
469473    p_from, p_to =  p
470474
@@ -484,20 +488,24 @@ function combine(p::Pair, gd::GroupedDataFrame;
484488        cs =  p_from
485489    end 
486490    return  combine_helper (cs =>  p_to, gd, keepkeys= keepkeys, ungroup= ungroup,
487-                           copycols= true , keeprows= false , renamecols= renamecols)
491+                           copycols= true , keeprows= false , renamecols= renamecols,
492+                           nthreads= nthreads)
488493end 
489494
490495combine (gd:: GroupedDataFrame ,
491496        cs:: Union{Pair, typeof(nrow), ColumnIndex, MultiColumnIndex} ...;
492-         keepkeys:: Bool = true , ungroup:: Bool = true , renamecols:: Bool = true ) = 
497+         keepkeys:: Bool = true , ungroup:: Bool = true , renamecols:: Bool = true ,
498+         nthreads:: Int = NTHREADS) = 
493499    _combine_prepare (gd, cs... , keepkeys= keepkeys, ungroup= ungroup,
494-                      copycols= true , keeprows= false , renamecols= renamecols)
500+                      copycols= true , keeprows= false , renamecols= renamecols,
501+                      nthreads= nthreads)
495502
496503function  _combine_prepare (gd:: GroupedDataFrame ,
497504                          @nospecialize (cs:: Union {Pair, typeof (nrow),
498505                                        ColumnIndex, MultiColumnIndex}. .. );
499506                          keepkeys:: Bool , ungroup:: Bool , copycols:: Bool ,
500-                           keeprows:: Bool , renamecols:: Bool )
507+                           keeprows:: Bool , renamecols:: Bool ,
508+                           nthreads:: Int )
501509    cs_vec =  []
502510    for  p in  cs
503511        if  p ===  nrow
@@ -570,7 +578,8 @@ function _combine_prepare(gd::GroupedDataFrame,
570578    f =  Pair[first (x) =>  first (last (x)) for  x in  cs_norm]
571579    nms =  Symbol[last (last (x)) for  x in  cs_norm]
572580    return  combine_helper (f, gd, nms, keepkeys= keepkeys, ungroup= ungroup,
573-                           copycols= copycols, keeprows= keeprows, renamecols= renamecols)
581+                           copycols= copycols, keeprows= keeprows, renamecols= renamecols,
582+                           nthreads= nthreads)
574583end 
575584
576585function  gen_groups (idx:: Vector{Int} )
@@ -590,11 +599,12 @@ end
590599function  combine_helper (f, gd:: GroupedDataFrame ,
591600                        nms:: Union{AbstractVector{Symbol},Nothing} = nothing ;
592601                        keepkeys:: Bool , ungroup:: Bool ,
593-                         copycols:: Bool , keeprows:: Bool , renamecols:: Bool )
602+                         copycols:: Bool , keeprows:: Bool , renamecols:: Bool ,
603+                         nthreads:: Int )
594604    if  ! ungroup &&  ! keepkeys
595605        throw (ArgumentError (" keepkeys=false when ungroup=false is not allowed"  ))
596606    end 
597-     idx, valscat =  _combine (f, gd, nms, copycols, keeprows, renamecols)
607+     idx, valscat =  _combine (f, gd, nms, copycols, keeprows, renamecols, nthreads )
598608    ! keepkeys &&  ungroup &&  return  valscat
599609    keys =  groupcols (gd)
600610    for  key in  keys
@@ -985,24 +995,72 @@ function copyto_widen!(res::AbstractVector{T}, x::AbstractVector) where T
985995end 
986996
987997function  groupreduce! (res:: AbstractVector , f, op, condf, adjust, checkempty:: Bool ,
988-                       incol:: AbstractVector , gd:: GroupedDataFrame )
998+                       incol:: AbstractVector , gd:: GroupedDataFrame ; nthreads :: Int )
989999    n =  length (gd)
1000+     groups =  gd. groups
9901001    if  adjust != =  nothing  ||  checkempty
9911002        counts =  zeros (Int, n)
9921003    end 
993-     groups =  gd. groups
994-     @inbounds  for  i in  eachindex (incol, groups)
995-         gix =  groups[i]
996-         x =  incol[i]
997-         if  gix >  0  &&  (condf ===  nothing  ||  condf (x))
998-             #  this check should be optimized out if U is not Any
999-             if  eltype (res) ===  Any &&  ! isassigned (res, gix)
1000-                 res[gix] =  f (x, gix)
1001-             else 
1002-                 res[gix] =  op (res[gix], f (x, gix))
1004+     nt =  min (nthreads, Threads. nthreads ())
1005+     if  nt <=  1  ||  axes (incol) !=  axes (groups)
1006+         @inbounds  for  i in  eachindex (incol, groups)
1007+             gix =  groups[i]
1008+             x =  incol[i]
1009+             if  gix >  0  &&  (condf ===  nothing  ||  condf (x))
1010+                 #  this check should be optimized out if U is not Any
1011+                 if  eltype (res) ===  Any &&  ! isassigned (res, gix)
1012+                     res[gix] =  f (x, gix)
1013+                 else 
1014+                     res[gix] =  op (res[gix], f (x, gix))
1015+                 end 
1016+                 if  adjust != =  nothing  ||  checkempty
1017+                     counts[gix] +=  1 
1018+                 end 
10031019            end 
1020+         end 
1021+     else 
1022+         res_vec =  Vector {typeof(res)} (undef, nt)
1023+         #  needs to be always allocated to fix type instability with @threads
1024+         counts_vec =  Vector {Vector{Int}} (undef, nt)
1025+         res_vec[1 ] =  res
1026+         if  adjust != =  nothing  ||  checkempty
1027+             counts_vec[1 ] =  counts
1028+         end 
1029+         for  i in  2 : nt
1030+             res_vec[i] =  copy (res)
10041031            if  adjust != =  nothing  ||  checkempty
1005-                 counts[gix] +=  1 
1032+                 counts_vec[i] =  zeros (Int, n)
1033+             end 
1034+         end 
1035+         Threads. @threads  for  tid in  1 : nt
1036+             res′ =  res_vec[tid]
1037+             if  adjust != =  nothing  ||  checkempty
1038+                 counts′ =  counts_vec[tid]
1039+             end 
1040+             start =  1  +  ((tid -  1 ) *  length (groups)) ÷  nt
1041+             stop =  (tid *  length (groups)) ÷  nt
1042+             @inbounds  for  i in  start: stop
1043+                 gix =  groups[i]
1044+                 x =  incol[i]
1045+                 if  gix >  0  &&  (condf ===  nothing  ||  condf (x))
1046+                     #  this check should be optimized out if U is not Any
1047+                     if  eltype (res′) ===  Any &&  ! isassigned (res′, gix)
1048+                         res′[gix] =  f (x, gix)
1049+                     else 
1050+                         res′[gix] =  op (res′[gix], f (x, gix))
1051+                     end 
1052+                     if  adjust != =  nothing  ||  checkempty
1053+                         counts′[gix] +=  1 
1054+                     end 
1055+                 end 
1056+             end 
1057+         end 
1058+         for  i in  2 : length (res_vec)
1059+             res .=  op .(res, res_vec[i])
1060+         end 
1061+         if  adjust != =  nothing  ||  checkempty
1062+             for  i in  2 : length (counts_vec)
1063+                 counts .+ =  counts_vec[i]
10061064            end 
10071065        end 
10081066    end 
@@ -1042,26 +1100,31 @@ end
10421100
10431101#  function barrier works around type instability of groupreduce_init due to applicable
10441102groupreduce (f, op, condf, adjust, checkempty:: Bool ,
1045-             incol:: AbstractVector , gd:: GroupedDataFrame ) = 
1103+             incol:: AbstractVector , gd:: GroupedDataFrame ;
1104+             nthreads:: Int ) = 
10461105    groupreduce! (groupreduce_init (op, condf, adjust, incol, gd),
1047-                  f, op, condf, adjust, checkempty, incol, gd)
1106+                  f, op, condf, adjust, checkempty, incol, gd, nthreads = nthreads )
10481107#  Avoids the overhead due to Missing when computing reduction
10491108groupreduce (f, op, condf:: typeof (! ismissing), adjust, checkempty:: Bool ,
1050-             incol:: AbstractVector , gd:: GroupedDataFrame ) = 
1109+             incol:: AbstractVector , gd:: GroupedDataFrame ;
1110+             nthreads:: Int ) = 
10511111    groupreduce! (disallowmissing (groupreduce_init (op, condf, adjust, incol, gd)),
1052-                  f, op, condf, adjust, checkempty, incol, gd)
1112+                  f, op, condf, adjust, checkempty, incol, gd, nthreads = nthreads )
10531113
1054- (r:: Reduce )(incol:: AbstractVector , gd:: GroupedDataFrame ) = 
1055-     groupreduce ((x, i) ->  x, r. op, r. condf, r. adjust, r. checkempty, incol, gd)
1114+ (r:: Reduce )(incol:: AbstractVector , gd:: GroupedDataFrame ; nthreads:: Int = NTHREADS) = 
1115+     groupreduce ((x, i) ->  x, r. op, r. condf, r. adjust, r. checkempty, incol, gd,
1116+                 nthreads= nthreads)
10561117
10571118#  this definition is missing in Julia 1.0 LTS and is required by aggregation for var
10581119#  TODO : remove this when we drop 1.0 support
10591120if  VERSION  <  v " 1.1" 
10601121    Base. zero (:: Type{Missing} ) =  missing 
10611122end 
10621123
1063- function  (agg:: Aggregate{typeof(var)} )(incol:: AbstractVector , gd:: GroupedDataFrame )
1064-     means =  groupreduce ((x, i) ->  x, Base. add_sum, agg. condf, / , false , incol, gd)
1124+ function  (agg:: Aggregate{typeof(var)} )(incol:: AbstractVector , gd:: GroupedDataFrame ;
1125+                                        nthreads:: Int = NTHREADS)
1126+     means =  groupreduce ((x, i) ->  x, Base. add_sum, agg. condf, / , false , incol, gd,
1127+                         nthreads= nthreads)
10651128    #  !ismissing check is purely an optimization to avoid a copy later
10661129    if  eltype (means) >:  Missing  &&  agg. condf != =  ! ismissing
10671130        T =  Union{Missing, real (eltype (means))}
@@ -1071,10 +1134,11 @@ function (agg::Aggregate{typeof(var)})(incol::AbstractVector, gd::GroupedDataFra
10711134    res =  zeros (T, length (gd))
10721135    return  groupreduce! (res, (x, i) ->  @inbounds (abs2 (x -  means[i])), + , agg. condf,
10731136                        (x, l) ->  l <=  1  ?  oftype (x /  (l- 1 ), NaN ) :  x /  (l- 1 ),
1074-                         false , incol, gd)
1137+                         false , incol, gd, nthreads = nthreads )
10751138end 
10761139
1077- function  (agg:: Aggregate{typeof(std)} )(incol:: AbstractVector , gd:: GroupedDataFrame )
1140+ function  (agg:: Aggregate{typeof(std)} )(incol:: AbstractVector , gd:: GroupedDataFrame ;
1141+                                        nthreads:: Int = NTHREADS)
10781142    outcol =  Aggregate (var, agg. condf)(incol, gd)
10791143    if  eltype (outcol) <:  Union{Missing, Rational} 
10801144        return  sqrt .(outcol)
@@ -1083,20 +1147,25 @@ function (agg::Aggregate{typeof(std)})(incol::AbstractVector, gd::GroupedDataFra
10831147    end 
10841148end 
10851149
1086- for  f in  (first, last)
1087-     function  (agg:: Aggregate{typeof(f)} )(incol:: AbstractVector , gd:: GroupedDataFrame )
1088-         n =  length (gd)
1089-         outcol =  similar (incol, n)
1090-         fillfirst! (agg. condf, outcol, incol, gd, rev= agg. f ===  last)
1091-         if  isconcretetype (eltype (outcol))
1092-             return  outcol
1093-         else 
1094-             return  copyto_widen! (Tables. allocatecolumn (typeof (first (outcol)), n), outcol)
1150+ for  f in  (:first , :last )
1151+     #  Without using @eval the presence of a keyword argument triggers a Julia bug
1152+     @eval  begin 
1153+         function  (agg:: Aggregate{typeof($f)} )(incol:: AbstractVector , gd:: GroupedDataFrame ;
1154+                                               nthreads:: Int = NTHREADS)
1155+             n =  length (gd)
1156+             outcol =  similar (incol, n)
1157+             fillfirst! (agg. condf, outcol, incol, gd, rev= agg. f ===  last)
1158+             if  isconcretetype (eltype (outcol))
1159+                 return  outcol
1160+             else 
1161+                 return  copyto_widen! (Tables. allocatecolumn (typeof (first (outcol)), n), outcol)
1162+             end 
10951163        end 
10961164    end 
10971165end 
10981166
1099- function  (agg:: Aggregate{typeof(length)} )(incol:: AbstractVector , gd:: GroupedDataFrame )
1167+ function  (agg:: Aggregate{typeof(length)} )(incol:: AbstractVector , gd:: GroupedDataFrame ;
1168+                                           nthreads:: Int = NTHREADS)
11001169    if  getfield (gd, :idx ) ===  nothing 
11011170        lens =  zeros (Int, length (gd))
11021171        @inbounds  for  gix in  gd. groups
@@ -1143,7 +1212,7 @@ end
11431212
11441213function  _combine (f:: AbstractVector{<:Pair} ,
11451214                  gd:: GroupedDataFrame , nms:: AbstractVector{Symbol} ,
1146-                   copycols:: Bool , keeprows:: Bool , renamecols:: Bool )
1215+                   copycols:: Bool , keeprows:: Bool , renamecols:: Bool , nthreads :: Int )
11471216    #  here f should be normalized and in a form of source_cols => fun
11481217    @assert  all (x ->  first (x) isa  Union{Int, AbstractVector{Int}, AsTable}, f)
11491218    @assert  all (x ->  last (x) isa  Base. Callable, f)
@@ -1185,7 +1254,7 @@ function _combine(f::AbstractVector{<:Pair},
11851254        if  length (gd) >  0  &&  isagg (p, gd)
11861255            incol =  parentdf[! , source_cols]
11871256            agg =  check_aggregate (last (p), incol)
1188-             outcol =  agg (incol, gd)
1257+             outcol =  agg (incol, gd, nthreads = nthreads )
11891258            res[i] =  idx_agg, outcol
11901259        elseif  keeprows &&  fun ===  identity &&  ! (source_cols isa  AsTable)
11911260            @assert  source_cols isa  Union{Int, AbstractVector{Int}}
@@ -1283,7 +1352,7 @@ function _combine(f::AbstractVector{<:Pair},
12831352end 
12841353
12851354function  _combine (fun:: Base.Callable , gd:: GroupedDataFrame , :: Nothing ,
1286-                   copycols:: Bool , keeprows:: Bool , renamecols:: Bool )
1355+                   copycols:: Bool , keeprows:: Bool , renamecols:: Bool , nthreads :: Int )
12871356    @assert  copycols &&  ! keeprows
12881357    #  use `similar` as `gd` might have been subsetted
12891358    firstres =  length (gd) >  0  ?  fun (gd[1 ]) :  fun (similar (parent (gd), 0 ))
@@ -1293,7 +1362,7 @@ function _combine(fun::Base.Callable, gd::GroupedDataFrame, ::Nothing,
12931362end 
12941363
12951364function  _combine (p:: Pair , gd:: GroupedDataFrame , :: Nothing ,
1296-                   copycols:: Bool , keeprows:: Bool , renamecols:: Bool )
1365+                   copycols:: Bool , keeprows:: Bool , renamecols:: Bool , nthreads :: Int )
12971366    #  here p should not be normalized as we allow tabular return value from fun
12981367    #  map and combine should not dispatch here if p is isagg
12991368    @assert  copycols &&  ! keeprows
@@ -1708,9 +1777,10 @@ julia> select(gd, :, AsTable(Not(:a)) => sum, renamecols=false)
17081777``` 
17091778""" 
17101779select (gd:: GroupedDataFrame , args... ; copycols:: Bool = true , keepkeys:: Bool = true ,
1711-        ungroup:: Bool = true , renamecols:: Bool = true ) = 
1780+        ungroup:: Bool = true , renamecols:: Bool = true , nthreads :: Int = NTHREADS ) = 
17121781    _combine_prepare (gd, args... , copycols= copycols, keepkeys= keepkeys,
1713-                      ungroup= ungroup, keeprows= true , renamecols= renamecols)
1782+                      ungroup= ungroup, keeprows= true , renamecols= renamecols,
1783+                      nthreads= NTHREADS)
17141784
17151785""" 
17161786    transform(gd::GroupedDataFrame, args...; 
0 commit comments