diff --git a/fink_science/__init__.py b/fink_science/__init__.py index 713bac4c..7ee8e351 100644 --- a/fink_science/__init__.py +++ b/fink_science/__init__.py @@ -12,4 +12,4 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "5.9.0" +__version__ = "5.10.0" diff --git a/fink_science/asteroids/processor.py b/fink_science/asteroids/processor.py index 6b97e4ed..53ceec31 100644 --- a/fink_science/asteroids/processor.py +++ b/fink_science/asteroids/processor.py @@ -12,8 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from pyspark.sql.functions import pandas_udf, PandasUDFType -from pyspark.sql.types import IntegerType +from pyspark.sql.functions import pandas_udf from fink_science import __file__ import os @@ -21,11 +20,62 @@ import pandas as pd import numpy as np -from fink_science.tester import spark_unit_tests +from pyspark.sql.types import ( + IntegerType, + ArrayType, + FloatType, + StructType, + StructField, + StringType, +) -@pandas_udf(IntegerType(), PandasUDFType.SCALAR) -def roid_catcher(jd, magpsf, ndethist, sgscore1, ssdistnr, distpsnr1): - """ Determine if an alert is a potential Solar System object (SSO) using two criteria: +from fink_science.tester import spark_unit_tests +from fink_fat.streaming_associations.fink_fat_associations import fink_fat_association + + +roid_schema = StructType( + [ + StructField( + "roid", + IntegerType(), + True, + ), + StructField( + "ffdistnr", + ArrayType(FloatType()), + True, + ), + StructField( + "estimator_id", + ArrayType(StringType()), + True, + ), + ] +) + + +@pandas_udf(roid_schema) +def roid_catcher( + ra, + dec, + jd, + magpsf, + candid, + cjd, + cmagpsf, + fid, + ndethist, + sgscore1, + ssdistnr, + distpsnr1, + error_radius, + mag_criterion_same_fid, + mag_criterion_diff_fid, + orbit_tw, + orbit_error, + confirmed_sso, +): + """Determine if an alert is a potential Solar System object (SSO) using two criteria: 1. The alert has been flagged as an SSO by ZTF (MPC) within 5" 2. The alert satisfies Fink criteria for a SSO @@ -35,18 +85,31 @@ def roid_catcher(jd, magpsf, ndethist, sgscore1, ssdistnr, distpsnr1): 4. If 2 detections, observations must be done within 30 min. The alerts are labeled using: - - [3] if the alert has been flagged by ZTF as SSO candidate - [2] if the alert has been flagged by Fink as SSO candidate - [1] if is the first time ZTF sees this object - [0] if it is likely not a SSO + * [5] if the alert has been associated with a candidate trajectory using an orbit estimator + * [4] if the alert has been associated with a candidate trajectory using a polyfit estimator + * [3] if the alert has been flagged by ZTF as SSO candidate + * [2] if the alert has been flagged by Fink as SSO candidate + * [1] if is the first time ZTF sees this object + * [0] if it is likely not a SSO Parameters ---------- + ra: Spark DataFrame Column + right ascension + dec: Spark DataFrame Column + declination jd: Spark DataFrame Column Observation Julian date at start of exposure [days] magpsf: Spark DataFrame Column Magnitude from PSF-fit photometry [mag] + candid: Spark DataFrame Column + alert identifier + cjd : Spark DataFrame Column + julian date history of the alerts + cmagpsf : Spark DataFrame Column + magnitude history of the alerts + fid: Spark DataFrame Column + filter identifier (for ZTF, 1 = g band and 2 = r band) ndethist: Spark DataFrame Column Number of spatially-coincident detections falling within 1.5 arcsec going back to beginning of survey; only detections that fell on the @@ -63,19 +126,46 @@ def roid_catcher(jd, magpsf, ndethist, sgscore1, ssdistnr, distpsnr1): distpsnr1: Spark DataFrame Column Distance of closest source from PS1 catalog; if exists within 30 arcsec [arcsec] + error_radius: Spark DataFrame Column + error radius used to associates the alerts with a candidate trajectory + mag_criterion_same_fid: Spark DataFrame Column + keep the association where the difference of magnitude between two measurements of the + same filter are below this threshold. + mag_criterion_diff_fid: Spark DataFrame Column + keep the association where the difference of magnitude + between two measurements of differents filter are below this threshold. + orbit_tw : int + time window used to filter the orbit + orbit_error: float + error radius to associates the alerts with the orbits + confirmed_sso: Spark DataFrame Column + if true, associates alerts with a flag equals to 3, + choose alerts with a flag equals to 1 or 2 otherwise. Returns ---------- - out: integer + roid: integer + 5 if the alert has been associated with a candidate trajectory using an orbit estimator + 4 if the alert has been associated with a candidate trajectory using a polyfit estimator 3 if the alert has been flagged by ZTF as SSO 2 if the alert has been flagged by Fink as SSO 1 if it is the first time ZTF sees this object 0 if it is likely not a SSO + ffdistnr : float list + distance from the trajectory prediction + - in arcmin if flag == 4 + - in arcsecond if flag == 5 + estimator_id: string list + The fink_fat trajectory identifier associated with the alerts (only if roid is 4 or 5) + - Is a integer if associated with a trajectory candidate + - is a string if associated with an orbit Examples ---------- >>> from fink_utils.spark.utils import concat_col >>> from pyspark.sql import functions as F + >>> from fink_science.tester import add_roid_datatest + >>> add_roid_datatest(spark, True) >>> df = spark.read.load(ztf_alert_sample) @@ -92,24 +182,33 @@ def roid_catcher(jd, magpsf, ndethist, sgscore1, ssdistnr, distpsnr1): # Perform the fit + classification (default model) >>> args = [ + ... 'candidate.ra', 'candidate.dec', + ... 'candidate.jd', 'candidate.magpsf', + ... 'candidate.candid', ... 'cjd', 'cmagpsf', + ... 'candidate.fid', ... 'candidate.ndethist', 'candidate.sgscore1', - ... 'candidate.ssdistnr', 'candidate.distpsnr1'] + ... 'candidate.ssdistnr', 'candidate.distpsnr1', + ... F.lit(2), F.lit(2), F.lit(30), F.lit(15.0), F.lit(True) + ... ] >>> df = df.withColumn('roid', roid_catcher(*args)) # Drop temp columns >>> df = df.drop(*what_prefix) - >>> df.filter(df['roid'] == 2).count() - 3 - - >>> df.filter(df['roid'] == 3).count() + >>> df.filter(df['roid.roid'] == 2).count() + 175 + >>> df.filter(df['roid.roid'] == 3).count() + 6694 + >>> df.filter(df['roid.roid'] == 4).count() + 2 + >>> df.filter(df['roid.roid'] == 5).count() 3 """ flags = np.zeros_like(ndethist.values, dtype=int) # remove NaN - nalerthist = magpsf.apply(lambda x: np.sum(np.array(x) == np.array(x))) + nalerthist = cmagpsf.apply(lambda x: np.sum(np.array(x) == np.array(x))) # first detection f0 = ndethist == 1 @@ -128,7 +227,7 @@ def roid_catcher(jd, magpsf, ndethist, sgscore1, ssdistnr, distpsnr1): # Remove long trend (within the observation) f3 = nalerthist == 2 - f4 = jd[f3].apply(lambda x: np.diff(x)[-1]) > (30. / (24. * 60.)) + f4 = cjd[f3].apply(lambda x: np.diff(x)[-1]) > (30.0 / (24.0 * 60.0)) flags[f3 & f4] = 0 # Remove very long trend (outside the current observation) @@ -148,18 +247,45 @@ def roid_catcher(jd, magpsf, ndethist, sgscore1, ssdistnr, distpsnr1): f_ndethist = ndethist <= 5 f_nalerthist = nalerthist <= 5 - mask_roid = f_distance1 & f_distance2 & f_relative_distance & f_ndethist & f_nalerthist + mask_roid = ( + f_distance1 & f_distance2 & f_relative_distance & f_ndethist & f_nalerthist + ) flags[mask_roid] = 3 - return pd.Series(flags) + # fink_fat associations + flags, estimator_id, ffdistnr = fink_fat_association( + ra, + dec, + magpsf, + fid, + jd, + candid, + flags, + confirmed_sso, + error_radius, + mag_criterion_same_fid, + mag_criterion_diff_fid, + orbit_tw, + orbit_error, + ) + + return pd.DataFrame( + { + "roid": flags, + "ffdistnr": ffdistnr, + "estimator_id": estimator_id, + } + ) if __name__ == "__main__": - """ Execute the test suite """ + """Execute the test suite""" globs = globals() path = os.path.dirname(__file__) - ztf_alert_sample = 'file://{}/data/alerts/datatest'.format(path) + ztf_alert_sample = "file://{}/data/alerts/roid_datatest/alerts_sample_roid".format( + path + ) globs["ztf_alert_sample"] = ztf_alert_sample # Run the test suite diff --git a/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00000-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00000-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet new file mode 100644 index 00000000..94ec2e72 Binary files /dev/null and b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00000-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet differ diff --git a/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00001-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00001-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet new file mode 100644 index 00000000..31e378fa Binary files /dev/null and b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00001-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet differ diff --git a/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00003-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00003-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet new file mode 100644 index 00000000..0c91d7c9 Binary files /dev/null and b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00003-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet differ diff --git a/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00005-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00005-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet new file mode 100644 index 00000000..0d60ab70 Binary files /dev/null and b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00005-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet differ diff --git a/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00007-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00007-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet new file mode 100644 index 00000000..aea9cdcf Binary files /dev/null and b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00007-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet differ diff --git a/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00009-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00009-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet new file mode 100644 index 00000000..7736fbfd Binary files /dev/null and b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00009-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet differ diff --git a/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00011-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00011-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet new file mode 100644 index 00000000..a3b4bd12 Binary files /dev/null and b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00011-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet differ diff --git a/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00013-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00013-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet new file mode 100644 index 00000000..15b39879 Binary files /dev/null and b/fink_science/data/alerts/roid_datatest/alerts_sample_roid/part-00013-0ec55ae3-a2d2-4481-a725-08640b6f851c-c000.snappy.parquet differ diff --git a/fink_science/data/alerts/roid_datatest/kalman.pkl b/fink_science/data/alerts/roid_datatest/kalman.pkl new file mode 100644 index 00000000..aea3c341 Binary files /dev/null and b/fink_science/data/alerts/roid_datatest/kalman.pkl differ diff --git a/fink_science/data/alerts/roid_datatest/orbital.parquet b/fink_science/data/alerts/roid_datatest/orbital.parquet new file mode 100644 index 00000000..0d35064f Binary files /dev/null and b/fink_science/data/alerts/roid_datatest/orbital.parquet differ diff --git a/fink_science/tester.py b/fink_science/tester.py index 718fa4d0..d8b13305 100644 --- a/fink_science/tester.py +++ b/fink_science/tester.py @@ -16,8 +16,9 @@ import doctest import numpy as np + def regular_unit_tests(global_args: dict = None, verbose: bool = False): - """ Base commands for the regular unit test suite + """Base commands for the regular unit test suite Include this routine in the main of a module, and execute: python3 mymodule.py @@ -51,8 +52,9 @@ def regular_unit_tests(global_args: dict = None, verbose: bool = False): sys.exit(doctest.testmod(globs=global_args, verbose=verbose)[0]) + def spark_unit_tests(global_args: dict = None, verbose: bool = False): - """ Base commands for the Spark unit test suite + """Base commands for the Spark unit test suite Include this routine in the main of a module, and execute: python3 mymodule.py @@ -80,14 +82,14 @@ def spark_unit_tests(global_args: dict = None, verbose: bool = False): spark = SparkSession.builder.getOrCreate() conf = SparkConf() - confdic = { - "spark.python.daemon.module": "coverage_daemon" - } + confdic = {"spark.python.daemon.module": "coverage_daemon"} - if spark.version.startswith('2'): + if spark.version.startswith("2"): confdic.update( { - "spark.jars.packages": 'org.apache.spark:spark-avro_2.11:{}'.format(spark.version) + "spark.jars.packages": "org.apache.spark:spark-avro_2.11:{}".format( + spark.version + ) } ) elif spark.version.startswith('3'): @@ -97,15 +99,15 @@ def spark_unit_tests(global_args: dict = None, verbose: bool = False): "spark.jars.packages": 'org.apache.spark:spark-avro_2.12:{},{}'.format(spark.version, py4j_mod) } ) - conf.setMaster("local[2]") + conf.setMaster("local[1]") conf.setAppName("fink_science_test") for k, v in confdic.items(): conf.set(key=k, value=v) - spark = SparkSession\ - .builder\ - .appName("fink_science_test")\ - .config(conf=conf)\ + spark = ( + SparkSession.builder.appName("fink_science_test") + .config(conf=conf) .getOrCreate() + ) global_args["spark"] = spark