diff --git a/README.md b/README.md index 3e609cd..a272515 100644 --- a/README.md +++ b/README.md @@ -66,8 +66,8 @@ be super user). For example, the `borealis.daemon` is run with the following Description=Borealis data flow inotify daemon [Service] -User=radar -ExecStart=/home/radar/data_flow/inotify_daemons/borealis.daemon +User= +ExecStart=/home/{USER}/data_flow/inotify_daemons/borealis.daemon Restart=always [Install] @@ -106,12 +106,12 @@ HOST [address of the campus computer, either hostname or IP] where username and address are the same as those set in the config file. The output will be something like: ``` Success: -transfer@pgrdist205:~> ssh -N -f dataman@sdc-serv.usask.ca; ssh -O check dataman@sdc-serv.usask.ca +site_user@site:~> ssh -N -f [username]@[address]; ssh -O check [username]@[address] Master running (pid=15739) Failure: -transfer@pgrdist205:~> ssh -N -f dataman@sdc-serv.usask.ca; ssh -O check dataman@sdc-serv.usask.ca -Control socket connect(/home/transfer/.ssh/controlmasters/3354587955ba492d0d5f595f8619d902ac0192a7): No such file or directory +site_user@site:~> ssh -N -f [username]@[address]; ssh -O check [username]@[address] +Control socket connect(/home/[site_user]/.ssh/controlmasters/3354587955ba492d0d5f595f8619d902ac0192a7): No such file or directory ``` ### Installing data flow @@ -127,8 +127,8 @@ is required for the sending of inotify flags between computers. `ssh-keygen -t ecdsa -b 521` - Copy the public key to the destination computer: `ssh-copy-id user@host` - Computers that must be linked: Borealis -> Site-Linux, Site-Linux -> sdc-serv - - For telemetry purposes, each data flow computer must also be linked to the logman user on - sdc-serv, so copy the ssh keys to logman@sdc-serv as well + - For telemetry purposes, each data flow computer must also be linked to a logging user on campus, so copy the + ssh keys to that user on sdc-serv as well 4. Install the inotify daemon for the respective computer (for example, install borealis.daemon with borealis_dataflow.service on the Borealis computer). As super user, do the following: - Copy the correct `.service` file from `inotify_daemons/services/` to @@ -152,8 +152,8 @@ with borealis_dataflow.service on the Borealis computer). As super user, do the 7. For telemetry purposes, summary logs are availabe for each script in the `~/logs/[script name]/summary/` directory. These logs contain the status of all operations on each file and easily parseable to monitor data flow operation. Each script rsyncs the summary files to -logman@sdc-serv for uploading to the Engineering dashboard. SSH password-free connection must be -setup between each computer and logman@sdc-serv for this to work correctly. +the logging user on sdc-serv for uploading to the Engineering dashboard. SSH password-free connection must be +setup between each computer and logging user on sdc-serv for this to work correctly. 8. To modify the data flow easily, a `config.sh` file is provided. This file specifies: - If the data flow can use the NAS at a site - What Borealis filetypes are to be converted and restructured diff --git a/mirror/README.md b/mirror/README.md new file mode 100644 index 0000000..23f5eb7 --- /dev/null +++ b/mirror/README.md @@ -0,0 +1,55 @@ +## Mirror +This section of the Data Flow repo contains scripts related to the campus to mirror rawacf data flow. These scripts +are run via crontab on sdc-serv. The scripts that are scheduled via crontab are located in the top level directory. +The remaining scripts are located in the `tools/` subdirectory and are utility scripts for the data flow operations. + +### Main Scripts +- **download_vt_data** - This script logs onto the Virginia Tech (VT) server and downloads all rawacfs for a given +radar from the holding directory on the VT server to our holding directory on campus. Run the script like: + - `download_vt_data holding_dir radar` +- **sync_mirror_data.sh** - This script logs onto one of the SuperDARN mirrors and downloads all rawacfs that our +mirror is missing. Any rawacfs that are in the failed list or blocklist will not be downloaded from the external mirror. +This script is designed to run for a given mirror and yyyymm. The yyyymm argument is optional and if no yyyymm is given, +the script will only sync with the given mirror for the current yyyymm. Run the script like: + - `flock nssc_filelock -c 'sync_mirror_data.sh NSSC_holding NSSC yyyymm'` to sync with the NSSC server and + - `flock bas_filelock -c 'sync_mirror_data.sh BAS_holding BAS yyyymm'` to sync with the BAS server +- **gatekeeper_globus.py** - This script acts on one of the three holding directories to transfer the rawacfs to the +SuperDARN Canada mirror via the globus_sdk python library. Below is a brief overview of this process, see the +SuperDARN Canada Wiki for details: + - Remove any rawacfs in holding_dir if they are in the failed list or blocklist. + - Hash the holding_dir and compare to the hash files on the mirror. Remove matching files from holding_dir and move + nonmatching files to `nomatch/` subdirectory in holding_dir. + - Check that all remaining rawacfs can be successfully unzipped and are not empty. Files who fail any of these tests + are moved to `holding_dir/failed/`. + - Update the failed files list `all_failed.txt` on the mirror and transfer these failed files to `local_data/failed/` + on the mirror. + - Transfer the remaining files in holding_dir to the mirror at `raw/yyyy/mm/` as they have passed all checks + - Update yyyymm.hashes and master.hashes + + Run the script like: + + - python `-u gatekeeper_globus.py -d holding_dir -m mirror_dir` + +- **batch_sync_mirror** - This script is run on both a weekly and monthly schedule for each of the NSSC and BAS servers. +On the weekly run, the script syncs the previous 12 months between the USASK and NSSC (or BAS) mirrors. On +the monthly run, the script syncs all data since 2006 between the USASK and NSSC (or BAS) mirrors. Run the script like: + - `flock nssc_filelock -c 'batch_sync_mirror NSSC weekly'` or `flock nssc_filelock -c 'batch_sync_mirror NSSC +monthly'` for syncing with NSSC and + - `flock bas_filelock -c 'batch_sync_mirror BAS weekly'` or `flock bas_filelock -c 'batch_sync_mirror BAS monthly'` +for syncing with BAS + +### Tools +- **delete_files_globus.py** - This script is designed to log on to the USask SuperDARN mirror via globus in order to +check for and remove files given a list of files. Run the script like: + - python `delete_files_globus.py -t 'raw' -r 'mirror_root_dir/' -d 'deletions_dir/' + -l '~/log_dir/' files_to_delete.txt` + - For usage instructions run python `delete_files_globus.py -h` +- **flag_experiment_files.py** - Script to check for and move local special experiment files to a subdirectory. Main +usage is to move files out of the holding directory when special experiment files are not flagged earlier in the data +flow chain. Note that although this script will normally be run on the holding directory, it is capable of running +on any directory with RAWACFs and will move all special experiment files to a subdirectory called `special_experiments/`. +- **gatekeeper_class.py** - This script contains utility functions for `gatekeeper_globus.py` as well as the +'Gatekeeper' class that is instantiated at the beginning of `gatekeeper_globus.py` and whose methods are called +throughout the script. This script should not be executed directly in the command line. Instead, simply import the +'Gatekeeper' class and other functions into the running script. For example, both `gatekeeper_globus.py` and +`delete_files_globus.py` import items from `gatekeeper_class.py` in this way. \ No newline at end of file diff --git a/mirror/batch_sync_mirror b/mirror/batch_sync_mirror index ad2f976..f1c0702 100755 --- a/mirror/batch_sync_mirror +++ b/mirror/batch_sync_mirror @@ -10,6 +10,9 @@ # Compare our mirror to the BAS/NSSC mirror 1 month at a time going back to January 2006 # Download any rawacf files that we are missing. Runs monthly. +readonly USER=$(whoami) +readonly SCRIPTDIR=/home/${USER}/data_flow/mirror/ + # Valid MIRROR and MODE values readonly VALID_MIRRORS=("BAS" "NSSC") readonly VALID_MODES=("weekly" "monthly") @@ -54,7 +57,7 @@ then while [ "$d" != "${enddate}" ] do yearmonth=$(date -d "$d" +%Y%m) - /home/dataman/data_flow/mirror/sync_mirror_data.sh ${HOLDING_DIR} ${MIRROR} ${yearmonth} + ${SCRIPTDIR}/sync_mirror_data.sh ${HOLDING_DIR} ${MIRROR} ${yearmonth} d=$(date -d "$d + 1 month" +%Y%m%d) done @@ -72,13 +75,13 @@ then while [ "$d" != "${enddate}" ] do yearmonth=$(date -d "$d" +%Y%m) - /home/dataman/data_flow/mirror/sync_mirror_data.sh ${HOLDING_DIR} ${MIRROR} ${yearmonth} + ${SCRIPTDIR}/sync_mirror_data.sh ${HOLDING_DIR} ${MIRROR} ${yearmonth} d=$(date -d "$d + 1 month" +%Y%m%d) done # Update mirror comparison - HASHES_DIRS="/home/dataman/tmp_hashes_usask_${mirror}_cmp/${today}.*" - CMP_DIR=/home/dataman/Documents/mirror_comparison_${today}/ + HASHES_DIRS="/home/${USER}/tmp_hashes_usask_${mirror}_cmp/${today}.*" + CMP_DIR=/home/${USER}/Documents/mirror_comparison_${today}/ mkdir -p ${CMP_DIR}/sorted find ${HASHES_DIRS} -name '*data.different' -exec cat '{}' \; > ${CMP_DIR}/${mirror}.different diff --git a/mirror/download_vt_data b/mirror/download_vt_data index cba74af..420e738 100755 --- a/mirror/download_vt_data +++ b/mirror/download_vt_data @@ -6,7 +6,7 @@ # Modified from Kevin Sterne's script # # Directory structure of the VT server's holding area is: -# /home/usask/outgoing/sas +# /home/${REMOTEUSER}/outgoing/sas # /rkn # /hal # /bks @@ -19,9 +19,9 @@ # # In order for public keys to work properly, permissions and owners must be # set up correctly. -# For /home/usask/ The permissions must be 755 -# For /home/usask/.ssh the permissions must be 700 -# For /home/usask/authorized_keys the permissions must be 644 +# For /home/${REMOTEUSER}/ The permissions must be 755 +# For /home/${REMOTEUSER}/.ssh the permissions must be 700 +# For /home/${REMOTEUSER}/authorized_keys the permissions must be 644 # # This script downloads any available radar rawacf data from Virginia # Tech's server's holding area. @@ -45,6 +45,8 @@ ############################################################################## # Initialize Some Variables ############################################################################## +USER=$(whoami) + # Valid 3-letter RADAR values readonly VALID_RADARS=("ade" "adw" "bks" "bpk" "cly" "cve" "cvw" "dce" "dcn" "fhe" "fhw" "fir" "gbr" "hal" "han" "hkw" "hok" "ice" "icw" "inv" "jme" "kap" "ker" "kod" "ksr" "lyr" "mcm" "pgr" @@ -79,7 +81,7 @@ CURYEAR=$(date +%Y) CURMONTH=$(date +%m) # Setup logfile -LOGGINGDIR=/home/dataman/logs/vt/${CURYEAR}/${CURMONTH} # Add _test for testing purposes +LOGGINGDIR=/home/${USER}/logs/vt/${CURYEAR}/${CURMONTH} # Add _test for testing purposes LOGFILE=${LOGGINGDIR}/${DATE}.${RADAR}.log # Make the log file directory if it doesn't exist mkdir -p ${LOGGINGDIR} @@ -161,9 +163,9 @@ fi # Create shortcut for sftp command SFTP=/usr/bin/sftp # User, hostname and directory to connect to on VT server -USER=usask -REMOTEHOST=sd-data1.ece.vt.edu -REMOTEDIRBASE=/home/usask/outgoing +REMOTEUSER=${VT_USER} +REMOTEHOST=${VT_HOST} +REMOTEDIRBASE=/home/${REMOTEUSER}/outgoing REMOTEDIR=${REMOTEDIRBASE}/${RADAR} # Path to hashfile for files on VT server (e.g., /data/holding/sas/hashes.remote) HASHESFILE=hashes.remote @@ -178,7 +180,7 @@ REMOTEHASHESSTART=$(date +%s) cd ${LOCALRADARDIR} || exit # This will write message to stderr if sha1sum fails # ssh into VT and hash the rawacf files for the given radar, output to hashes.remote -ssh ${USER}@${REMOTEHOST} "cd ${REMOTEDIR}; ${HASHPROG} *${RADAR}*rawacf.bz2" > ${HASHESFILE} 2> ${HASHESERRORS} +ssh ${REMOTEUSER}@${REMOTEHOST} "cd ${REMOTEDIR}; ${HASHPROG} *${RADAR}*rawacf.bz2" > ${HASHESFILE} 2> ${HASHESERRORS} # Capture return value from the above sha1sum RETURNVALUE=$? REMOTEHASHESEND=$(date +%s) @@ -298,7 +300,7 @@ echo "Downloading files..." # Only execute sftp batch file if there is at least one file to download from VT if [[ ${totalFiles} -gt 0 ]] then - ${SFTP} -p -b ${SFTPBATCH} ${USER}@${REMOTEHOST} 2> ${SFTPERRORS} + ${SFTP} -p -b ${SFTPBATCH} ${REMOTEUSER}@${REMOTEHOST} 2> ${SFTPERRORS} fi DOWNLOADTIMEEND=$(date +%s) @@ -381,7 +383,7 @@ echo "Deleting files..." # Only execute sftp batch file if there is at least one file to remove from VT if [[ ${totalFilesDelete} -gt 0 ]] then - ${SFTP} -p -b ${SFTPBATCHDELETE} ${USER}@${REMOTEHOST} 2> ${SFTPERRORSDELETE} + ${SFTP} -p -b ${SFTPBATCHDELETE} ${REMOTEUSER}@${REMOTEHOST} 2> ${SFTPERRORSDELETE} fi DELETETIMEEND=$(date +%s) diff --git a/mirror/gatekeeper_globus.py b/mirror/gatekeeper_globus.py index bd417ba..9685326 100755 --- a/mirror/gatekeeper_globus.py +++ b/mirror/gatekeeper_globus.py @@ -44,7 +44,7 @@ import logging import argparse -from gatekeeper_class import Gatekeeper, parse_data_filename +from tools.gatekeeper_class import Gatekeeper, parse_data_filename # Make sure there is only one instance running of this script from tendo import singleton @@ -54,14 +54,19 @@ HOME = expanduser("~") TRANSFER_RT_FILENAME = f"{HOME}/.globus_transfer_rt" PERSONAL_UUID_FILENAME = f"{HOME}/.globusonline/lta/client-id.txt" +GATEKEEPER_APP_FILENAME = f"{HOME}/mirror_id_files/gatekeeper_app_id.txt" if isfile(PERSONAL_UUID_FILENAME): with open(PERSONAL_UUID_FILENAME) as f: PERSONAL_UUID = f.readline().strip() # Client ID retrieved from https://auth.globus.org/v2/web/developers -# gatekeeper_app_CLIENT_ID = 'bc9d5b7a-6592-4156-bfb8-aeb0fc4fb07e' # Saif's app -gatekeeper_app_CLIENT_ID = 'e70228d0-56a2-4d85-bf63-7fbccc92dcd3' # Rem's app +if isfile(GATEKEEPER_APP_FILENAME): + with open(GATEKEEPER_APP_FILENAME) as f: + file = f.readlines() + for line in file: + if "CM app" in line: + gatekeeper_app_CLIENT_ID = line.split("=")[1].split()[0] def main(): @@ -107,7 +112,7 @@ def main(): # Check script arguments as well as existence of various directories logger = gk.logger - # Clear out working directory /home/dataman/tmp/* before use + # Clear out working directory ~/tmp/* before use if isdir(gk.get_working_dir()): shutil.rmtree(gk.get_working_dir()) mkdir(gk.get_working_dir()) @@ -118,6 +123,7 @@ def main(): logger.info(f"Args: {args.holding} {args.mirror} {args.pattern}") + # Set holding directory, mirror directory, yearmonth, radar, and sync pattern from parsed arguments # Set holding directory, mirror directory, yearmonth, radar, and sync pattern from parsed arguments gk.set_holding_dir(args.holding) gk.set_mirror_root_dir(args.mirror) @@ -322,38 +328,35 @@ def main(): yearmonth_dict.pop(ym) else: logger.info(f"{ym} hash file retrieved from mirror.") - # sha1sum files in holding_dir and compare to yyyymm.hashes now in working dir (-c == compare) - command_string = f"cd {gk.get_holding_dir()}; sha1sum -c {gk.get_working_dir()}/{ym}.hashes" - sha1sum_process = subprocess.Popen(command_string, shell=True, - stdout=subprocess.PIPE, stderr=subprocess.PIPE) - out, err = sha1sum_process.communicate() - sha1sum_decoded_output = out.decode().split("\n") - sha1sum_decoded_error = err.decode().split("\n") - # Loop through result of sha1sum comparison for each file - # Only remove from files_to_upload if file exists both in holding_dir and hashfile in working_dir - # Need further investigation into "Failed open or read" and "" results - for sha1sum_result in sha1sum_decoded_output: - hashed_file = sha1sum_result.split(":")[0] - if sha1sum_result.find("FAILED open or read") != -1: - pass - # If hashes do not match, add file to nonmatching files list, remove from files_to_upload - elif sha1sum_result.find("FAILED") != -1: - logger.warning(f"{hashed_file} hash doesn't match. Adding to no match list, and removing from list of files to upload.") - non_matching_files.append(hashed_file) - files_to_upload_dict.pop(hashed_file) - # If hashes match, remove from files_to_upload as it is already on mirror - elif sha1sum_result.find("OK") != -1: - logger.info(f"{hashed_file} already exists on mirror and hash matches. Removing from files to upload.") - files_to_upload_dict.pop(hashed_file) - # Comment out removal of matching files from holding dir for testing purposes - try: - remove(f"{gk.get_holding_dir()}/{hashed_file}") - except OSError as error: - logger.error(f"Error trying to remove file: {error}.") - elif sha1sum_result == "": - pass - else: - logger.warning(f"Error, I don't know how to deal with: {sha1sum_result}.") + + # Create dictionary to contain filenames as keys and hashes as values for ym.hashes of current iteration + ym_hashes = {} + with open(f"{gk.get_working_dir()}/{ym}.hashes", 'r') as hash_file: + for line in hash_file: + (val, key) = line.split() + ym_hashes[key] = val + + # loop over files in holding dir for ym of current iteration and compare hashes to ym.hashes + for holding_file in list(yearmonth_dict[ym].keys()): + # Remove file from files to upload if this filename is already on the mirror + # Compare hashes to see if the file should go to nomatch/ directory or just be removed from holding + if holding_file in ym_hashes.keys(): + files_to_upload_dict.pop(holding_file) + # If hashes do not match, add file to nonmatching files list (to be moved to nomatch/ directory) + if files_to_upload_dict[holding_file]['hash'] != ym_hashes[holding_file]: + logger.warning(f"{holding_file} hash doesn't match. Adding to no match list, and removing " + f"from list of files to upload.") + non_matching_files.append(holding_file) + # If hashes match, remove from holding directory + else: + logger.info(f"{holding_file} already exists on mirror and hash matches. Removing from files " + f"to upload.") + # Comment out removal of matching files from holding dir for testing purposes + try: + remove(f"{gk.get_holding_dir()}/{holding_file}") + except OSError as error: + logger.error(f"Error trying to remove file: {error}.") + # If yyyymm.hashes DNE, create it ONLY IF yyyymm is the current year and month else: # Need to check if this is the current month, otherwise error out diff --git a/mirror/sync_mirror_data.sh b/mirror/sync_mirror_data.sh index df875a2..c672ff5 100755 --- a/mirror/sync_mirror_data.sh +++ b/mirror/sync_mirror_data.sh @@ -19,7 +19,7 @@ # 3) Find out which data files have been updated or added # 4) Remove any blocked files from the files to download # 5) Remove any previously failed files from the files to download -# 6) Download remaining files not on usask, blocked or previously failed +# 6) Download remaining files not on usask, not blocked, and not previously failed # # Emails are sent to indicate files that are different, files that are # missing on BAS/NSSC, files that are blocked or files that failed checks @@ -29,6 +29,8 @@ # Initialize some variables ############################################################################## +readonly USER=$(whoami) + # Valid HOLDINGDIR and MIRROR values readonly VALID_DIRS=("/data/holding/BAS" "/data/holding/NSSC") # Add /test/ for testing purposes readonly VALID_MIRRORS=("BAS" "NSSC") @@ -56,19 +58,14 @@ if [[ ! " ${VALID_MIRRORS[*]} " =~ " ${MIRROR} " ]]; then exit 1 fi -# Variables for Cedar user and paths -# readonly cedar_user=saifm@robot.cedar.alliancecan.ca -readonly cedar_user=rar129 -#readonly cedar_user=saifm -readonly cedar_login="${cedar_user}@robot.fir.alliancecan.ca" +# Variables for Cedar/Fir user and paths +readonly cedar_user=${FIR_USER} +readonly cedar_login=${FIR_LOGIN} # Add _test to Cedar paths for testing purposes -#readonly CEDAR_HASHES=/home/saifm/projects/rrg-kam136-ad/sdarn/chroot/sddata/raw/ -#readonly CEDAR_BLOCKLIST=/home/saifm/projects/rrg-kam136-ad/sdarn/chroot/sddata/.config/blocklist/ -#readonly CEDAR_FAILED=/home/saifm/projects/rrg-kam136-ad/sdarn/chroot/sddata/.config/all_failed.txt -readonly CEDAR_HASHES="/project/rrg-kam136-ad/sdarn/chroot/sddata/raw/" -readonly CEDAR_BLOCKLIST="/project/rrg-kam136-ad/sdarn/chroot/sddata/.config/blocklist/" -readonly CEDAR_FAILED="/project/rrg-kam136-ad/sdarn/chroot/sddata/.config/all_failed.txt" +readonly CEDAR_HASHES="/project/${FIR_PROJECT_ID}/sdarn/chroot/sddata/raw/" +readonly CEDAR_BLOCKLIST="/project/${FIR_PROJECT_ID}/sdarn/chroot/sddata/.config/blocklist/" +readonly CEDAR_FAILED="/project/${FIR_PROJECT_ID}/sdarn/chroot/sddata/.config/all_failed.txt" # Date/time variables STARTTIME=$(date +%s) @@ -83,13 +80,13 @@ mirror="${MIRROR,,}" # Set username, hostname, and path for sftp usage given the provided mirror if [[ "${MIRROR}" == "BAS" ]] then - USER=superdarn - REMOTEHOST=bslsuperdarnb.nerc-bas.ac.uk + REMOTEUSER=${BAS_USER} + REMOTEHOST=${BAS_HOST} REMOTEDIR=/data/superdarn/data/raw elif [[ "${MIRROR}" == "NSSC" ]] then - USER=dataman - REMOTEHOST=superdarn.mirror.nssdc.ac.cn + REMOTEUSER=${NSSC_USER} + REMOTEHOST=${NSSC_HOST} REMOTEDIR=/sddata/raw fi @@ -98,7 +95,7 @@ HASHPROG=/usr/bin/sha1sum SFTP=/usr/bin/sftp # Setup logfile -LOGDIR=/home/dataman/logs/${mirror}/${CURYEAR}/${CURMONTH}/ # Add _test for testing purposes +LOGDIR=/home/${USER}/logs/${mirror}/${CURYEAR}/${CURMONTH}/ # Add _test for testing purposes LOGFILE=${LOGDIR}${DATE_TIME}_${mirror}.log # Make the log file directory if it doesn't exist mkdir -p "${LOGDIR}" @@ -115,12 +112,12 @@ EMAILSUBJECT="sync_${mirror}_data - [${YYYYMM}]" # What is our holding directory for hashes files? # Double check that these directories are consistent between sync_mirror_data.sh and monthly batch_sync_mirror (for bas) # Add /test_mirror/ in all 4 of the below paths for testing purposes -LOCALHASHDIR=/home/dataman/tmp_hashes_usask_${mirror}_cmp/$DATE_TIME -MIRRORHASHDIR=/home/dataman/tmp_hashes_${mirror}/$DATE_TIME +LOCALHASHDIR=/home/${USER}/tmp_hashes_usask_${mirror}_cmp/$DATE_TIME +MIRRORHASHDIR=/home/${USER}/tmp_hashes_${mirror}/$DATE_TIME # What is our holding directory for blocked files? -LOCALBLDIR=/home/dataman/tmp_blocklist/$DATE_TIME +LOCALBLDIR=/home/${USER}/tmp_blocklist/$DATE_TIME # What is our holding directory for previously failed files? -LOCALFAILEDDIR=/home/dataman/tmp_failed/$DATE_TIME +LOCALFAILEDDIR=/home/${USER}/tmp_failed/$DATE_TIME # Make sure above paths exist mkdir -p "${LOCALHASHDIR}" "${MIRRORHASHDIR}" "${LOCALBLDIR}" "${LOCALFAILEDDIR}" @@ -213,9 +210,9 @@ echo "exit" >> "${SFTPBATCH}" # Execute sftp batch file, logging errors to sftp error file # Add use of key for NSSC connection for migration if [[ "${MIRROR}" == "BAS" ]]; then - ${SFTP} -b "${SFTPBATCH}" ${USER}@${REMOTEHOST} >> "${LOGFILE}" 2> "${SFTPERRORS}" + ${SFTP} -b "${SFTPBATCH}" ${REMOTEUSER}@${REMOTEHOST} >> "${LOGFILE}" 2> "${SFTPERRORS}" elif [[ "${MIRROR}" == "NSSC" ]]; then - ${SFTP} -i "~/.ssh/id_NSSC" -b "${SFTPBATCH}" ${USER}@${REMOTEHOST} >> "${LOGFILE}" 2> "${SFTPERRORS}" + ${SFTP} -i "~/.ssh/id_NSSC" -b "${SFTPBATCH}" ${REMOTEUSER}@${REMOTEHOST} >> "${LOGFILE}" 2> "${SFTPERRORS}" fi if [[ -s $SFTPERRORS ]] @@ -412,9 +409,9 @@ then echo "Getting files..." # Add use of key for NSSC connection for migration if [[ "${MIRROR}" == "BAS" ]]; then - ${SFTP} -b "${SFTPBATCH}" ${USER}@${REMOTEHOST} >> "${LOGFILE}" 2> "${SFTPERRORS}" + ${SFTP} -b "${SFTPBATCH}" ${REMOTEUSER}@${REMOTEHOST} >> "${LOGFILE}" 2> "${SFTPERRORS}" elif [[ "${MIRROR}" == "NSSC" ]]; then - ${SFTP} -i "~/.ssh/id_NSSC" -b "${SFTPBATCH}" ${USER}@${REMOTEHOST} >> "${LOGFILE}" 2> "${SFTPERRORS}" + ${SFTP} -i "~/.ssh/id_NSSC" -b "${SFTPBATCH}" ${REMOTEUSER}@${REMOTEHOST} >> "${LOGFILE}" 2> "${SFTPERRORS}" fi else # The script already checked and exited if totalToDownload -eq 0 diff --git a/mirror/tools/delete_files_globus.py b/mirror/tools/delete_files_globus.py new file mode 100755 index 0000000..1ae8c71 --- /dev/null +++ b/mirror/tools/delete_files_globus.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python +# coding: utf-8 +""" +Last modification 201706 by Kevin Krieger + +This script is designed to log on to the University of Saskatchewan globus +SuperDARN mirror in order to check for and remove files given a list of files + +Example of script call: +python /path/to/delete_files_globus.py -t 'raw' -r 'chroot/sddata/' -d 'local_data/deletions/' + -l '~/logs/deletions_globus/' ~/mirror_blocklists/cve/${year}_cve_files_to_delete.txt + +See 'Removing Blocked Files from the Mirror' subsection of Data Flow section of SDARN wiki for more info +""" + +from gatekeeper_class import Gatekeeper +from os.path import expanduser, isfile, isdir +import argparse +import sys +from datetime import datetime + +HOME = expanduser("~") +TRANSFER_RT_FILENAME = f"{HOME}/.globus_transfer_rt" +GATEKEEPER_APP_FILENAME = f"{HOME}/mirror_id_files/gatekeeper_app_id.txt" + +# Client ID retrieved from https://auth.globus.org/v2/web/developers +if isfile(GATEKEEPER_APP_FILENAME): + with open(GATEKEEPER_APP_FILENAME) as f: + file = f.readlines() + for line in file: + if "CM app" in line: + CLIENT_ID = line.split("=")[1].split()[0] + +data_types = ['raw', 'dat', 'fit', 'map', 'grid', 'summary'] + +if __name__ == '__main__': + cur_date = datetime.now().strftime("%Y%m%d.%H%M") + + parser = argparse.ArgumentParser() + parser.add_argument("file_list", + help="List of files to delete from the mirror, one per line") + parser.add_argument("-t", "--data_type", + help="One of {} Default: 'raw'".format(data_types), + default='raw') + parser.add_argument("-r", "--mirror_root", help="Mirror root directory", + default="~/test_mirror") + parser.add_argument("-d", "--deletions_directory", + help="Directory on endpoint to store deleted files", + default="~/test_mirror/test_deletions") + parser.add_argument("-l", "--logging_directory", + help="Directory to store log files", + default=HOME+"/logs/deletions_globus/") + args = parser.parse_args() + file_list = args.file_list + data_type = args.data_type + mirror_root = args.mirror_root + deletions_directory = args.deletions_directory + log_directory = args.logging_directory + + if not isdir(log_directory): + sys.exit("Logging directory {} doesn't exist.".format(log_directory)) + + log_file_name = "{}/{}".format(log_directory, cur_date) + logfile = open(log_file_name, 'a') + logfile.write(cur_date+"\n") + + # Open the transfer refresh token file if it exists + if isfile(TRANSFER_RT_FILENAME): + with open(TRANSFER_RT_FILENAME) as f: + gk = Gatekeeper(CLIENT_ID, transfer_rt=f.readline()) + else: + gk = Gatekeeper(CLIENT_ID) + + gk.set_mirror_root_dir(mirror_root) + + # Get the files to delete into a clean python list + # (get rid of newlines, whitespaces, incorrect datatype lines) + with open(file_list) as f: + files_to_delete = f.readlines() + files_to_delete = [x.strip() for x in files_to_delete] + files_to_delete = [x for x in files_to_delete if data_type in x] + + # Download hashes files + gk.get_hashes_all(data_type=data_type) + if not gk.wait_for_last_task(timeout_s=600): + logfile.write("Get hashes all didn't complete in time. Exiting\n") + sys.exit("Get hashes all didn't complete in time.") + + # Now for each file to remove from the mirror, go through hashes files and find it, remove + # the line and put back all updated hashes files + updated_hashes = [] + files_not_found = [] + for file_to_delete in files_to_delete: + year = file_to_delete[0:4] + month = file_to_delete[4:6] + logfile.write("{}: year: {} month: {}\n".format(file_to_delete, year, month)) + try: + with open("{}/{}{}.hashes".format(gk.get_working_dir(), year, month)) as hashfile: + files_list = hashfile.readlines() + except IOError: + # Just exit if we didn't find the hash file, that's a problem requiring human insight + logfile.write("Could not open {}{}.hashes, does it exist? Exiting\n".format(year, month)) + sys.exit(1) + + found = False + for f in files_list: + if file_to_delete in f: + # it exists. Remove it from the list + found = True + files_list = [x for x in files_list if f not in x] + logfile.write("Removed {} from {}{}.hashes\n".format(f.strip(), year, month)) + updated_hashes.append("{}{}.hashes".format(year, month)) + with open("{}/{}{}.hashes".format(gk.get_working_dir(), + year, month), 'w') as hashfile: + hashfile.writelines(files_list) + break + if not found: + files_not_found.append(file_to_delete) + logfile.write("{} DNE in {}{}.hashes for data type {}\n".format(file_to_delete, + year, month, + data_type)) + files_to_delete = [x for x in files_to_delete if x != file_to_delete] + + logfile.write("Files to delete:\n") + logfile.writelines(files_to_delete) + logfile.write("\nFiles not found:\n") + logfile.writelines(files_not_found) + updated_hashes = list(set(updated_hashes)) + logfile.write("Updated hashes files: {}\n".format(updated_hashes)) + + # Now that we have files to delete and updated_hashes files, upload the new hashes and then + # remove the files, making sure both succeed + for updated_hash_file in updated_hashes: + year = updated_hash_file[0:4] + month = updated_hash_file[4:6] + gk.put_hashes(year, month, data_type) + while not gk.wait_for_last_task(): + logfile.write("Still waiting for {}{}.hashes to upload...\n".format(year, month)) + continue + + if len(files_to_delete) > 0: + gk.move_files_on_endpoint(files_to_delete, + "{}/{}/".format(deletions_directory, cur_date), + data_type=data_type) + files_to_delete = [] + for f in files_not_found: + year = f[0:4] + month = f[4:6] + if gk.check_for_file_existence("{}/{}/{}/{}/{}".format(gk.get_mirror_root_dir(), + data_type, year, month, + f.strip('\n'))): + logfile.write("{} on mirror but not in hashes file! Removing\n".format(f.strip('\n'))) + files_to_delete.append(f) + + logfile.write("Files not found in hashes but still on mirror:\n") + logfile.writelines(files_to_delete) + if len(files_to_delete) > 0: + gk.move_files_on_endpoint(files_to_delete, + "{}/{}".format(deletions_directory, cur_date), + data_type=data_type) diff --git a/mirror/tools/flag_experiment_files.py b/mirror/tools/flag_experiment_files.py new file mode 100644 index 0000000..fd4bff5 --- /dev/null +++ b/mirror/tools/flag_experiment_files.py @@ -0,0 +1,62 @@ +import pydarnio +import os +import argparse +""" +Script to check for and move local special experiment files to a subdirectory. Main usage is to move files out of the +holding directory when special experiment files are not flagged earlier in the data flow chain + +Usage: +Call check_files(filepath) for a given path to see the list of files (if any) that are from special experiments. +Then, call move_files(filepath) to move those files to a subdirectory. +This could probably be done in a better way where the output of check_files() could be used for the move_files() + function to avoid redundancy. This has been done this way to allow for a manual check of the file list before + moving the flagged files to the subdirectory. + +Normal CPIDs: + 151 -> Normalscan + 157 -> Normalsound + 191 -> Interleavesound + 3503 -> Twofsound +""" + + +# Function to find and move files with abnormal CPIDs at a given path to a subdirectory +def move_files(filepath): + normal_cpid = (151, 157, 191, 3503) + file_path = filepath + subdir = f"{file_path}/special_experiments/" + for f in sorted(os.listdir(file_path)): + if os.path.isfile(f"{file_path}/{f}"): + rec = pydarnio.read_rawacf(f"{file_path}/{f}", mode="sniff") + if rec['cp'] not in normal_cpid: + print(f"Not normal op! cpid = {rec['cp']}. Moving file {f}") + os.rename(f"{file_path}/{f}", f"{subdir}/{f}") + else: + print(f"Normal op! cpid = {rec['cp']}. {f}") + + +# Function to find and log files with abnormal CPIDs at a given path +def check_files(filepath): + normal_cpid = (151, 157, 191, 3503) + file_path = filepath + file_list = [] + for f in sorted(os.listdir(file_path)): + if os.path.isfile(f"{file_path}/{f}"): + rec = pydarnio.read_rawacf(f"{file_path}/{f}", mode="sniff") + if rec['cp'] not in normal_cpid: + print(f"Not normal op! cpid = {rec['cp']}. Moving file {f}") + file_list.append(f) + else: + print(f"Normal op! cpid = {rec['cp']}. {f}") + + print(file_list) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("-p", "--path", help="Directory to scan for special experiment files", default='') + args = parser.parse_args() + filepath = args.path + + check_files(filepath) + # move_files(filepath) diff --git a/mirror/gatekeeper_class.py b/mirror/tools/gatekeeper_class.py similarity index 98% rename from mirror/gatekeeper_class.py rename to mirror/tools/gatekeeper_class.py index cd5013f..ca28fa7 100755 --- a/mirror/gatekeeper_class.py +++ b/mirror/tools/gatekeeper_class.py @@ -4,7 +4,7 @@ import inspect from datetime import datetime, timedelta from os.path import expanduser, isfile, getsize, isdir -from os import listdir, mkdir, remove, rename, stat +from os import listdir, mkdir, remove, rename, stat, getlogin import shutil import fnmatch import sys @@ -20,11 +20,20 @@ HOME = expanduser("~") TRANSFER_RT_FILENAME = f"{HOME}/.globus_transfer_rt" PERSONAL_UUID_FILENAME = f"{HOME}/.globusonline/lta/client-id.txt" +MIRROR_UUID_FILENAME = f"{HOME}/mirror_id_files/mirror_uuid.txt" if isfile(PERSONAL_UUID_FILENAME): with open(PERSONAL_UUID_FILENAME) as f: PERSONAL_UUID = f.readline().strip() +# Get the UUID for our endpoint on FIR +if isfile(MIRROR_UUID_FILENAME): + with open(MIRROR_UUID_FILENAME) as f: + file = f.readlines() + for line in file: + if "CM UUID" in line: + MIRROR_UUID = line.split("=")[1].split()[0] + def extendable_logger(log_name, file_name, level=logging.INFO): """ Will set up and format a logger referenced as such: logger.info(msg), logger.warning(msg), etc. @@ -142,7 +151,7 @@ def __init__(self, client_id, client_secret=None, transfer_rt=None, working_dir= self.possible_data_types = ['raw', 'dat'] # Setup logger - logdir = "/home/dataman/logs/globus" # Add _test for testing purposes + logdir = f"/{HOME}/logs/globus" # Add _test for testing purposes logfile = (f"{logdir}/{self.cur_year:04d}/{self.cur_month:02d}/{self.cur_year:04d}{self.cur_month:02d}" f"{self.cur_day:02d}.{self.cur_hour:02d}{self.cur_minute:02d}_globus_gatekeeper.log") # Make sure year and month directories for logfile exist @@ -159,10 +168,8 @@ def __init__(self, client_id, client_secret=None, transfer_rt=None, working_dir= # Get a transfer client # Note that this uuid is the new cedar globus version 5 uuid, and hardcoded here due to hacking # this shit together in a quick timeframe. Ideally this would be searched and found programmatically via the function below "get_superdarn_mirror_uuid, which works to get the correct uuid, but we need a transfer client to use it, but we need the uuid to get a transfer client... so yeah, chicken and egg" - # self.mirror_uuid = '8dec4129-9ab4-451d-a45f-5b4b8471f7a3' - # self.mirror_uuid = '88cd829c-75fa-44e6-84bb-42e6250afaea' - # self.mirror_uuid = "bc9d5b7a-6592-4156-bfb8-aeb0fc4fb07e" - self.mirror_uuid = '087f175e-9e9c-42cc-9efc-667d25b64fa0' # SuperDARN Mirror (Cedar/Fir) UUID + # TO DO: Get the mirror_uuid using a function, so we don't have to read it from a file. + self.mirror_uuid = MIRROR_UUID self.transfer_client = self.get_transfer_client() # Email information ########################################################## @@ -171,7 +178,7 @@ def __init__(self, client_id, client_secret=None, transfer_rt=None, working_dir= # emailMessage is initialized to nothing here, and filled in with an # appropriate message depending upon the reason for the email. self.email_recipients = ['superdarn_engineers@usask.ca'] - self.email_from = 'dataman' + self.email_from = getlogin() self.current_time = datetime.now() self.email_subject = '[Gatekeeper Globus] ' + self.current_time.strftime("%Y%m%d.%H%M : ") self.smtp_server = 'localhost'