1
1
using System . Text . Json ;
2
2
using System . Text . Json . Nodes ;
3
+ using System . Threading . Tasks ;
3
4
using Azure . Core ;
4
5
using Microsoft . Azure . Functions . Worker ;
5
6
using Microsoft . Extensions . Logging ;
@@ -54,14 +55,16 @@ public async Async.Task Run(
54
55
return ;
55
56
}
56
57
58
+ var storageAccount = new ResourceIdentifier ( topicElement . GetString ( ) ! ) ;
59
+
57
60
try {
58
61
// Setting isLastRetryAttempt to false will rethrow any exceptions
59
62
// With the intention that the azure functions runtime will handle requeing
60
63
// the message for us. The difference is for the poison queue, we're handling the
61
64
// requeuing ourselves because azure functions doesn't support retry policies
62
65
// for queue based functions.
63
66
64
- var result = await FileAdded ( fileChangeEvent , isLastRetryAttempt : false ) ;
67
+ var result = await FileAdded ( storageAccount , fileChangeEvent , isLastRetryAttempt : false ) ;
65
68
if ( ! result . IsOk && result . ErrorV . Code == ErrorCode . ADO_WORKITEM_PROCESSING_DISABLED ) {
66
69
await RequeueMessage ( msg , TimeSpan . FromDays ( 1 ) ) ;
67
70
}
@@ -71,16 +74,47 @@ public async Async.Task Run(
71
74
}
72
75
}
73
76
74
- private async Async . Task < OneFuzzResultVoid > FileAdded ( JsonDocument fileChangeEvent , bool isLastRetryAttempt ) {
77
+ private async Async . Task < OneFuzzResultVoid > FileAdded ( ResourceIdentifier storageAccount , JsonDocument fileChangeEvent , bool isLastRetryAttempt ) {
75
78
var data = fileChangeEvent . RootElement . GetProperty ( "data" ) ;
76
79
var url = data . GetProperty ( "url" ) . GetString ( ) ! ;
77
80
var parts = url . Split ( "/" ) . Skip ( 3 ) . ToList ( ) ;
78
81
79
- var container = parts [ 0 ] ;
82
+ var container = Container . Parse ( parts [ 0 ] ) ;
80
83
var path = string . Join ( '/' , parts . Skip ( 1 ) ) ;
81
84
82
- _log . LogInformation ( "file added : {Container} - {Path}" , container , path ) ;
83
- return await _notificationOperations . NewFiles ( Container . Parse ( container ) , path , isLastRetryAttempt ) ;
85
+ _log . LogInformation ( "file added : {Container} - {Path}" , container . String , path ) ;
86
+
87
+ var ( _, result ) = await (
88
+ ApplyRetentionPolicy ( storageAccount , container , path ) ,
89
+ _notificationOperations . NewFiles ( container , path , isLastRetryAttempt ) ) ;
90
+
91
+ return result ;
92
+ }
93
+
94
+ private async Async . Task < bool > ApplyRetentionPolicy ( ResourceIdentifier storageAccount , Container container , string path ) {
95
+ if ( await _context . FeatureManagerSnapshot . IsEnabledAsync ( FeatureFlagConstants . EnableContainerRetentionPolicies ) ) {
96
+ // default retention period can be applied to the container
97
+ // if one exists, we will set the expiry date on the newly-created blob, if it doesn't already have one
98
+ var account = await _storage . GetBlobServiceClientForAccount ( storageAccount ) ;
99
+ var containerClient = account . GetBlobContainerClient ( container . String ) ;
100
+ var containerProps = await containerClient . GetPropertiesAsync ( ) ;
101
+ var retentionPeriod = RetentionPolicyUtils . GetContainerRetentionPeriodFromMetadata ( containerProps . Value . Metadata ) ;
102
+ if ( ! retentionPeriod . IsOk ) {
103
+ _log . LogError ( "invalid retention period: {Error}" , retentionPeriod . ErrorV ) ;
104
+ } else if ( retentionPeriod . OkV is TimeSpan period ) {
105
+ var blobClient = containerClient . GetBlobClient ( path ) ;
106
+ var tags = ( await blobClient . GetTagsAsync ( ) ) . Value . Tags ;
107
+ var expiryDate = DateTime . UtcNow + period ;
108
+ var tag = RetentionPolicyUtils . CreateExpiryDateTag ( DateOnly . FromDateTime ( expiryDate ) ) ;
109
+ if ( tags . TryAdd ( tag . Key , tag . Value ) ) {
110
+ _ = await blobClient . SetTagsAsync ( tags ) ;
111
+ _log . LogInformation ( "applied container retention policy ({Policy}) to {Path}" , period , path ) ;
112
+ return true ;
113
+ }
114
+ }
115
+ }
116
+
117
+ return false ;
84
118
}
85
119
86
120
private async Async . Task RequeueMessage ( string msg , TimeSpan ? visibilityTimeout = null ) {
0 commit comments