From 3eaeb6454d06f8bfc2e2306fbc2e8f76d90c2189 Mon Sep 17 00:00:00 2001 From: Harsh <154314702+harsh-0409@users.noreply.github.com> Date: Tue, 18 Mar 2025 22:01:06 +0530 Subject: [PATCH 1/3] Added batch processing and transaction handling in update_pageviews function - Added explicit transaction handling using wp10db.begin() to ensure all updates are treated as a single unit. - Implemented error handling with rollback to maintain data integrity in case of errors during the update process. - Introduced a batch commit strategy to commit changes after processing a specified number of rows (commit_after) to optimize performance and reduce overhead. - Updated logging to reflect the number of committed updates. --- wp1/scores.py | 59 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 36 insertions(+), 23 deletions(-) diff --git a/wp1/scores.py b/wp1/scores.py index 8688053f4..fbdbe4c76 100644 --- a/wp1/scores.py +++ b/wp1/scores.py @@ -178,31 +178,44 @@ def update_db_pageviews(wp10db, lang, article, page_id, views): }) -def update_pageviews(filter_lang=None, commit_after=50000): - download_pageviews() +def update_pageviews(filter_lang=None, commit_after=10000): + download_pageviews() - # Convert filter lang to bytes if necessary - if filter_lang is not None and isinstance(filter_lang, str): - filter_lang = filter_lang.encode('utf-8') + if filter_lang is not None and isinstance(filter_lang, str): + filter_lang = filter_lang.encode('utf-8') - if filter_lang is None: - logger.info('Updating all pageviews') - else: - logger.info('Updating pageviews for %s', filter_lang.decode('utf-8')) - - wp10db = wp10_connect() - n = 0 - for lang, article, page_id, views in pageview_components(): - if filter_lang is None or lang == filter_lang: - update_db_pageviews(wp10db, lang, article, page_id, views) - - n += 1 - if n >= commit_after: - logger.debug('Committing') - wp10db.commit() - n = 0 - wp10db.commit() - logger.info('Done') + if filter_lang is None: + logger.info('Updating all pageviews') + else: + logger.info('Updating pageviews for %s', filter_lang.decode('utf-8')) + + wp10db = wp10_connect() + + try: + # Start a transaction + wp10db.begin() + + n = 0 # Counter for the number of updates + for lang, article, page_id, views in pageview_components(): + if filter_lang is None or lang == filter_lang: + update_db_pageviews(wp10db, lang, article, page_id, views) + n += 1 + + # Commit every commit_after rows + if n >= commit_after: + wp10db.commit() + logger.info('Committed %d updates', n) + n = 0 # Reset counter for the next batch + + if n > 0: # Commit any remaining updates + wp10db.commit() + logger.info('Committed remaining %d updates', n) + + logger.info('Done') + + except Exception as e: + logger.exception('Error during pageview update, rolling back transaction') + wp10db.rollback() # Rollback in case of error if __name__ == '__main__': From 4c2161a8eecdf591f40bc9770f3178426e2f62de Mon Sep 17 00:00:00 2001 From: Harsh <154314702+harsh-0409@users.noreply.github.com> Date: Tue, 18 Mar 2025 22:11:21 +0530 Subject: [PATCH 2/3] Update scores.py - Atomic Update: The transaction is started with wp10db.begin() before processing any rows and committed only once after the loop completes. This ensures that the entire update is applied as a single atomic unit. - Error Handling & Rollback: If an error occurs during processing, the except block rolls back the entire transaction, maintaining data integrity. - Removal of Batch Commits: The previous batch commit strategy (committing after every 50,000 rows) has been removed. As noted by @audiodude, committing in batches would break the requirement to update the entire dataset in one transaction. --- wp1/scores.py | 30 +++++++++--------------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/wp1/scores.py b/wp1/scores.py index fbdbe4c76..19eac1163 100644 --- a/wp1/scores.py +++ b/wp1/scores.py @@ -178,7 +178,7 @@ def update_db_pageviews(wp10db, lang, article, page_id, views): }) -def update_pageviews(filter_lang=None, commit_after=10000): +def update_pageviews(filter_lang=None): download_pageviews() if filter_lang is not None and isinstance(filter_lang, str): @@ -190,32 +190,20 @@ def update_pageviews(filter_lang=None, commit_after=10000): logger.info('Updating pageviews for %s', filter_lang.decode('utf-8')) wp10db = wp10_connect() - try: - # Start a transaction + # Start a single transaction for the entire update wp10db.begin() - - n = 0 # Counter for the number of updates for lang, article, page_id, views in pageview_components(): if filter_lang is None or lang == filter_lang: update_db_pageviews(wp10db, lang, article, page_id, views) - n += 1 - - # Commit every commit_after rows - if n >= commit_after: - wp10db.commit() - logger.info('Committed %d updates', n) - n = 0 # Reset counter for the next batch - - if n > 0: # Commit any remaining updates - wp10db.commit() - logger.info('Committed remaining %d updates', n) - - logger.info('Done') - + # Commit the transaction only once after processing all rows + wp10db.commit() + logger.info('Done updating pageviews') except Exception as e: - logger.exception('Error during pageview update, rolling back transaction') - wp10db.rollback() # Rollback in case of error + # Roll back the entire transaction in case of any error + wp10db.rollback() + logger.exception("Error during update_pageviews; transaction rolled back") + raise if __name__ == '__main__': From 32f79b7f351d21430069d96099606e9ba6e7414f Mon Sep 17 00:00:00 2001 From: Harsh <154314702+harsh-0409@users.noreply.github.com> Date: Thu, 20 Mar 2025 14:57:42 +0530 Subject: [PATCH 3/3] Restored the 2-page indentation. --- wp1/scores.py | 74 +++++++++++++++++++++++++-------------------------- 1 file changed, 37 insertions(+), 37 deletions(-) diff --git a/wp1/scores.py b/wp1/scores.py index 19eac1163..dd1b1d639 100644 --- a/wp1/scores.py +++ b/wp1/scores.py @@ -14,7 +14,7 @@ from wp1.wp10_db import connect as wp10_connect PageviewRecord = namedtuple('PageviewRecord', - ['lang', 'name', 'page_id', 'views']) + ['lang', 'name', 'page_id', 'views']) logger = logging.getLogger(__name__) @@ -29,9 +29,9 @@ def wiki_languages(): r = requests.get( - 'https://wikistats.wmcloud.org/api.php?action=dump&table=wikipedias&format=csv', - headers={'User-Agent': WP1_USER_AGENT}, - timeout=60, + 'https://wikistats.wmcloud.org/api.php?action=dump&table=wikipedias&format=csv', + headers={'User-Agent': WP1_USER_AGENT}, + timeout=60, ) try: r.raise_for_status() @@ -53,8 +53,8 @@ def get_pageview_url(prev=False): now = get_current_datetime() dt = datetime(now.year, now.month, 1) - timedelta(weeks=weeks) return dt.strftime( - 'https://dumps.wikimedia.org/other/pageview_complete/monthly/' - '%Y/%Y-%m/pageviews-%Y%m-user.bz2') + 'https://dumps.wikimedia.org/other/pageview_complete/monthly/' + '%Y/%Y-%m/pageviews-%Y%m-user.bz2') def get_pageview_file_path(filename): @@ -167,43 +167,43 @@ def pageview_components(): def update_db_pageviews(wp10db, lang, article, page_id, views): with wp10db.cursor() as cursor: cursor.execute( - '''INSERT INTO page_scores (ps_lang, ps_page_id, ps_article, ps_views) - VALUES (%(lang)s, %(page_id)s, %(article)s, %(views)s) - ON DUPLICATE KEY UPDATE ps_views = %(views)s - ''', { - 'lang': lang, - 'page_id': page_id, - 'article': article, - 'views': views - }) + '''INSERT INTO page_scores (ps_lang, ps_page_id, ps_article, ps_views) + VALUES (%(lang)s, %(page_id)s, %(article)s, %(views)s) + ON DUPLICATE KEY UPDATE ps_views = %(views)s + ''', { + 'lang': lang, + 'page_id': page_id, + 'article': article, + 'views': views + }) def update_pageviews(filter_lang=None): - download_pageviews() + download_pageviews() - if filter_lang is not None and isinstance(filter_lang, str): - filter_lang = filter_lang.encode('utf-8') + if filter_lang is not None and isinstance(filter_lang, str): + filter_lang = filter_lang.encode('utf-8') - if filter_lang is None: - logger.info('Updating all pageviews') - else: - logger.info('Updating pageviews for %s', filter_lang.decode('utf-8')) + if filter_lang is None: + logger.info('Updating all pageviews') + else: + logger.info('Updating pageviews for %s', filter_lang.decode('utf-8')) - wp10db = wp10_connect() - try: - # Start a single transaction for the entire update - wp10db.begin() - for lang, article, page_id, views in pageview_components(): - if filter_lang is None or lang == filter_lang: - update_db_pageviews(wp10db, lang, article, page_id, views) - # Commit the transaction only once after processing all rows - wp10db.commit() - logger.info('Done updating pageviews') - except Exception as e: - # Roll back the entire transaction in case of any error - wp10db.rollback() - logger.exception("Error during update_pageviews; transaction rolled back") - raise + wp10db = wp10_connect() + try: + # Start a single transaction for the entire update + wp10db.begin() + for lang, article, page_id, views in pageview_components(): + if filter_lang is None or lang == filter_lang: + update_db_pageviews(wp10db, lang, article, page_id, views) + # Commit the transaction only once after processing all rows + wp10db.commit() + logger.info('Done updating pageviews') + except Exception as e: + # Roll back the entire transaction in case of any error + wp10db.rollback() + logger.exception("Error during update_pageviews; transaction rolled back") + raise if __name__ == '__main__':