11#!/usr/bin/env python3
22import datetime
3+ import hashlib
4+ import logging
35import uuid
46from typing import Any
57
68import asyncpg
9+ import dateutil .parser
710import httpx
811from pydantic import BaseModel
912
1417from db import dbh
1518from gitlab_model import MergeRequestPayload
1619
20+ logger = logging .getLogger (__name__ )
21+
1722
1823class MRMessRef (BaseModel ):
1924 merge_request_message_ref_id : int
@@ -138,11 +143,9 @@ async def merge_request(
138143 conversation_tokens : list [str ],
139144 participant_ids_filter : list [int ],
140145):
146+ payload_fingerprint = hashlib .sha256 (mr .model_dump_json ().encode ("utf8" )).hexdigest ()
147+ logger .debug ("payload fingerprint: %s" , payload_fingerprint )
141148 mri = await dbh .get_merge_request_ref_infos (mr )
142- convtoken_to_msgrefs = await get_or_create_message_refs (
143- mri .merge_request_ref_id ,
144- conversation_tokens ,
145- )
146149
147150 participant_found = True
148151 if participant_ids_filter :
@@ -159,7 +162,7 @@ async def merge_request(
159162 connection : asyncpg .Connection
160163
161164 if mr .object_attributes .action in ("update" ):
162- ... # Update MR info (head_pipeline_id)
165+ # Update MR info (head_pipeline_id)
163166 async with await database .acquire () as connection :
164167 row = await connection .fetchrow (
165168 """UPDATE merge_request_ref
@@ -173,6 +176,42 @@ async def merge_request(
173176 if row is not None :
174177 mri .merge_request_extra_state = row ["merge_request_extra_state" ]
175178
179+ # if it's a transition from draft to ready
180+ # - Delete all messages related to this MR prior to the current event update
181+ # then create_or_update_message will re-post new message
182+ # to have cards being the most recent in the feeds.
183+ # Rows are used as lock to avoid race condition when multiple instances can receive hooks
184+ # for the same MR (multiple webhook same project, multiple instances [kube?])
185+ if mr .changes and "draft" in mr .changes and not mr .object_attributes .draft :
186+ assert mr .object_attributes .updated_at is not None
187+ update_ref_datetime = dateutil .parser .parse (mr .object_attributes .updated_at )
188+ message_expiration = datetime .timedelta (seconds = 0 )
189+ async with connection .transaction ():
190+ res = await connection .fetch (
191+ """SELECT merge_request_message_ref_id, message_id
192+ FROM merge_request_message_ref
193+ WHERE merge_request_ref_id = $1 AND created_at < $2
194+ FOR UPDATE""" ,
195+ mri .merge_request_ref_id ,
196+ update_ref_datetime ,
197+ )
198+ for row in res :
199+ message_id = row .get ("message_id" )
200+ if message_id is not None :
201+ await connection .execute (
202+ """INSERT INTO msg_to_delete
203+ (message_id, expire_at)
204+ VALUES
205+ ($1, now()+$2::INTERVAL)""" ,
206+ str (message_id ),
207+ message_expiration ,
208+ )
209+ await connection .execute (
210+ "DELETE FROM merge_request_message_ref WHERE merge_request_message_ref_id = $1" ,
211+ row .get ("merge_request_message_ref_id" ),
212+ )
213+ periodic_cleanup .reschedule ()
214+
176215 if mr .object_attributes .action in ("approved" , "unapproved" ):
177216 v = mr .user .model_dump ()
178217 v ["status" ] = mr .object_attributes .action
@@ -189,13 +228,23 @@ async def merge_request(
189228 )
190229
191230 mri = await dbh .get_merge_request_ref_infos (mr )
192- card = render (mri )
231+ should_be_collapsed : bool = mr .object_attributes .draft or mr .object_attributes .work_in_progress
232+ card = render (
233+ mri ,
234+ collapsed = should_be_collapsed ,
235+ show_collapsible = should_be_collapsed ,
236+ )
193237 summary = (
194238 f"MR { mri .merge_request_payload .object_attributes .state } :"
195239 f" { mri .merge_request_payload .object_attributes .title } \n "
196240 f"on { mri .merge_request_payload .project .path_with_namespace } "
197241 )
198242
243+ convtoken_to_msgrefs = await get_or_create_message_refs (
244+ mri .merge_request_ref_id ,
245+ conversation_tokens ,
246+ )
247+
199248 if mr .object_attributes .action in ("open" , "reopen" ) or True :
200249 async with httpx .AsyncClient () as client :
201250 for ct in conversation_tokens :
@@ -215,21 +264,13 @@ async def merge_request(
215264 or not participant_found ,
216265 )
217266
218- if (
219- mr .object_attributes .action
220- in (
221- "merge" ,
222- "close" ,
223- )
224- or mr .object_attributes .state
225- in (
226- "closed" ,
227- "merged" ,
228- )
229- or mr .object_attributes .draft
230- or mr .object_attributes .work_in_progress
267+ if mr .object_attributes .action in (
268+ "merge" ,
269+ "close" ,
270+ ) or mr .object_attributes .state in (
271+ "closed" ,
272+ "merged" ,
231273 ):
232-
233274 message_expiration = datetime .timedelta (seconds = 30 )
234275 async with await database .acquire () as connection :
235276 res = await connection .fetch (
0 commit comments