Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions Tests/Transport/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -403,9 +403,9 @@ public function providePlatformSql(): iterable
}

/**
* @dataProvider setupIndicesProvider
* @dataProvider setupIndexesProvider
*/
public function testSetupIndices(string $platformClass, array $expectedIndices)
public function testSetupIndexes(string $platformClass, array $expectedIndexes, array $options = [])
{
$driverConnection = $this->createMock(DBALConnection::class);
$driverConnection->method('getConfiguration')->willReturn(new Configuration());
Expand All @@ -416,10 +416,10 @@ public function testSetupIndices(string $platformClass, array $expectedIndices)
$expectedTable->addColumn('id', Types::BIGINT);
$expectedTable->setPrimaryKey(['id']);
// Make sure columns for indices exists so addIndex() will not throw
foreach (array_unique(array_merge(...$expectedIndices)) as $columnName) {
foreach (array_unique(array_merge(...$expectedIndexes)) as $columnName) {
$expectedTable->addColumn($columnName, Types::STRING);
}
foreach ($expectedIndices as $indexColumns) {
foreach ($expectedIndexes as $indexColumns) {
$expectedTable->addIndex($indexColumns);
}
$schemaManager->method('createSchema')->willReturn($schema);
Expand All @@ -443,13 +443,19 @@ public function testSetupIndices(string $platformClass, array $expectedIndices)
$connection->setup();
}

public function setupIndicesProvider(): iterable
public function setupIndexesProvider(): iterable
{
yield 'MySQL' => [
MySQL57Platform::class,
[['delivered_at']],
];

yield 'MySQL with forced index' => [
MySQL57Platform::class,
[['queue_name'], ['available_at'], ['delivered_at']],
['force_indexes_creation' => true],
];

yield 'Other platforms' => [
AbstractPlatform::class,
[['queue_name'], ['available_at'], ['delivered_at']],
Expand Down
4 changes: 3 additions & 1 deletion Transport/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Connection implements ResetInterface
'queue_name' => 'default',
'redeliver_timeout' => 3600,
'auto_setup' => true,
'force_indexes_creation' => false,
];

/**
Expand All @@ -56,6 +57,7 @@ class Connection implements ResetInterface
* * queue_name: name of the queue
* * redeliver_timeout: Timeout before redeliver messages still in handling state (i.e: delivered_at is not null and message is still in table). Default: 3600
* * auto_setup: Whether the table should be created automatically during send / get. Default: true
* * force_indexes_creation: Whether indexes should be created also on mysql
*/
protected $configuration = [];
protected $driverConnection;
Expand Down Expand Up @@ -413,7 +415,7 @@ private function addTableToSchema(Schema $schema): void
->setNotnull(false);
$table->setPrimaryKey(['id']);
// No indices on queue_name and available_at on MySQL to prevent deadlock issues when running multiple consumers.
if (!$this->driverConnection->getDatabasePlatform() instanceof MySqlPlatform) {
if ($this->configuration['force_indexes_creation'] || !$this->driverConnection->getDatabasePlatform() instanceof MySqlPlatform) {
$table->addIndex(['queue_name']);
$table->addIndex(['available_at']);
}
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
"doctrine/dbal": "^2.13|^3.0",
"doctrine/persistence": "^1.3|^2",
"symfony/property-access": "^4.4|^5.0|^6.0",
"symfony/serializer": "^4.4|^5.0|^6.0"
"symfony/serializer": "^4.4|^5.0|^6.0",
"symfony/phpunit-bridge": "^5.2"
},
"conflict": {
"doctrine/dbal": "<2.13",
Expand Down