@@ -10,12 +10,14 @@ use matrix_sdk::{
10
10
media:: MediaRequest ,
11
11
room:: { Receipts , RoomMember } ,
12
12
ruma:: {
13
- api:: client:: { receipt:: create_receipt:: v3:: ReceiptType , session:: get_login_types:: v3:: LoginType } ,
13
+ api:: client:: { receipt:: create_receipt:: v3:: ReceiptType , session:: get_login_types:: v3:: LoginType ,
14
+ search:: search_events:: v3:: { Criteria , Categories , Request , }
15
+ } ,
14
16
events:: {
15
17
receipt:: ReceiptThread , room:: {
16
18
message:: { ForwardThread , RoomMessageEventContent } ,
17
19
MediaSource ,
18
- } , FullStateEventContent
20
+ } , FullStateEventContent , AnyTimelineEvent ,
19
21
} ,
20
22
OwnedEventId , OwnedMxcUri , OwnedRoomAliasId , OwnedRoomId , OwnedUserId , RoomId , UserId
21
23
} ,
@@ -26,12 +28,12 @@ use matrix_sdk::{
26
28
use matrix_sdk_ui:: {
27
29
room_list_service:: { self , RoomListLoadingState } ,
28
30
sync_service:: { self , SyncService } ,
29
- timeline:: { AnyOtherFullStateEventContent , EventTimelineItem , LiveBackPaginationStatus , RepliedToInfo , TimelineDetails , TimelineItemContent } ,
31
+ timeline:: { AnyOtherFullStateEventContent , EventTimelineItem , LiveBackPaginationStatus , RepliedToInfo , TimelineDetails , TimelineItem , TimelineItemContent } ,
30
32
Timeline ,
31
33
} ;
32
34
use tokio:: {
33
35
runtime:: Handle ,
34
- sync:: mpsc:: { UnboundedSender , UnboundedReceiver } ,
36
+ sync:: mpsc:: { UnboundedSender , UnboundedReceiver , unbounded_channel } ,
35
37
task:: JoinHandle ,
36
38
} ;
37
39
use unicode_segmentation:: UnicodeSegmentation ;
@@ -43,7 +45,7 @@ use crate::{
43
45
} , media_cache:: MediaCacheEntry , persistent_state:: { self , ClientSessionPersisted } , profile:: {
44
46
user_profile:: { AvatarState , UserProfile } ,
45
47
user_profile_cache:: { enqueue_user_profile_update, UserProfileUpdate } ,
46
- } , utils:: MEDIA_THUMBNAIL_FORMAT , verification:: add_verification_event_handlers_and_sync_client
48
+ } , shared :: search :: SearchUpdate , utils:: MEDIA_THUMBNAIL_FORMAT , verification:: add_verification_event_handlers_and_sync_client
47
49
} ;
48
50
49
51
@@ -264,9 +266,28 @@ pub enum MatrixRequest {
264
266
FullyReadReceipt {
265
267
room_id : OwnedRoomId ,
266
268
event_id : OwnedEventId ,
267
- }
269
+ } ,
270
+
271
+ /// Request to search for messages in a room.
272
+ SearchRoomMessages {
273
+ room_id : OwnedRoomId ,
274
+ search_term : String ,
275
+ next_batch : Option < String > ,
276
+ } ,
268
277
}
269
278
279
+ // Create a global sender/receiver pair for search results
280
+ pub static SEARCH_RESULTS_SENDER : OnceLock < crossbeam_channel:: Sender < SearchUpdate > > = OnceLock :: new ( ) ;
281
+ pub static SEARCH_RESULTS_RECEIVER : OnceLock < crossbeam_channel:: Receiver < SearchUpdate > > = OnceLock :: new ( ) ;
282
+
283
+ // Initialize the search channel
284
+ pub fn init_search_channel ( ) {
285
+ let ( sender, receiver) = crossbeam_channel:: unbounded ( ) ;
286
+ SEARCH_RESULTS_SENDER . set ( sender) . unwrap ( ) ;
287
+ SEARCH_RESULTS_RECEIVER . set ( receiver) . unwrap ( ) ;
288
+ }
289
+
290
+
270
291
/// Submits a request to the worker thread to be executed asynchronously.
271
292
pub fn submit_async_request ( req : MatrixRequest ) {
272
293
REQUEST_SENDER . get ( )
@@ -671,7 +692,66 @@ async fn async_worker(mut receiver: UnboundedReceiver<MatrixRequest>) -> Result<
671
692
Err ( _e) => error ! ( "Failed to send fully read receipt to room {room_id}, event {event_id}; error: {_e:?}" ) ,
672
693
}
673
694
} ) ;
674
- }
695
+ } ,
696
+
697
+ MatrixRequest :: SearchRoomMessages { room_id, search_term, next_batch } => {
698
+ let mut categories = Categories :: new ( ) ;
699
+ categories. room_events = Some ( Criteria :: new ( search_term) ) ;
700
+
701
+ let mut search_request = Request :: new ( categories) ;
702
+ search_request. next_batch = next_batch;
703
+
704
+ let client = get_client ( ) . unwrap ( ) ;
705
+
706
+ match client. send ( search_request, None ) . await {
707
+ Ok ( response) => {
708
+ let room_events = response. search_categories . room_events ;
709
+
710
+ let event_ids: Vec < OwnedEventId > = room_events
711
+ . results
712
+ . into_iter ( )
713
+ . filter_map ( |search_result| {
714
+ search_result. result
715
+ . and_then ( |raw_event| raw_event. deserialize ( ) . ok ( ) )
716
+ . and_then ( |event| match event {
717
+ AnyTimelineEvent :: MessageLike ( msg) => Some ( msg. event_id ( ) . to_owned ( ) ) ,
718
+ AnyTimelineEvent :: State ( state) => Some ( state. event_id ( ) . to_owned ( ) ) ,
719
+ _ => None ,
720
+ } )
721
+ } )
722
+ . collect ( ) ;
723
+
724
+ if event_ids. is_empty ( ) {
725
+ log ! ( "Search found no events." ) ;
726
+ return Ok ( ( ) ) ;
727
+ }
728
+
729
+ // Now that we have event IDs, request pagination of those events.
730
+ let timeline = {
731
+ let mut all_room_info = ALL_ROOM_INFO . lock ( ) . unwrap ( ) ;
732
+ let Some ( room_info) = all_room_info. get_mut ( & room_id) else {
733
+ log ! ( "Skipping search request for not-yet-known room {room_id}" ) ;
734
+ return Ok ( ( ) ) ;
735
+ } ;
736
+ room_info. timeline . clone ( )
737
+ } ;
738
+
739
+ // Fetch events by requesting timeline pagination for each found event.
740
+ let _fetch_events_task = Handle :: current ( ) . spawn ( async move {
741
+ log ! ( "Fetching {} events for search results in room {}" , event_ids. len( ) , room_id) ;
742
+
743
+ for event_id in event_ids {
744
+ let _ = timeline. fetch_details_for_event ( & event_id) . await ;
745
+ }
746
+
747
+ log ! ( "Finished fetching search results." ) ;
748
+ } ) ;
749
+ }
750
+ Err ( e) => {
751
+ error ! ( "Search request failed: {:?}" , e) ;
752
+ }
753
+ }
754
+ }
675
755
}
676
756
}
677
757
0 commit comments