Skip to content

Commit e47bc2d

Browse files
committed
feat(query): add op_type=create support and dedupe helpers (createOnly/createOrFail)
1 parent 7a484df commit e47bc2d

File tree

8 files changed

+177
-19
lines changed

8 files changed

+177
-19
lines changed

src/Eloquent/Builder.php

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ class Builder extends BaseEloquentBuilder
7171
'bucketaggregation',
7272
'openpit',
7373
'bulkinsert',
74+
'createonly',
7475
];
7576

7677
/**
@@ -95,7 +96,7 @@ public function setModel($model): static
9596
public function newModelInstance($attributes = [])
9697
{
9798
$model = $this->model->newInstance($attributes)->setConnection(
98-
$this->query->getConnection()->getName()
99+
$this->query->connection->getName()
99100
);
100101

101102
// Merge in our options.
@@ -216,7 +217,7 @@ public function hydrate(array $items)
216217
$instance = $this->newModelInstance();
217218

218219
return $instance->newCollection(array_map(function ($item) use ($instance) {
219-
return $instance->newFromBuilder($item, $this->getConnection()->getName());
220+
return $instance->newFromBuilder($item, $this->query->connection->getName());
220221
}, $items));
221222
}
222223

@@ -394,6 +395,17 @@ public function withoutRefresh()
394395
return $this->model;
395396
}
396397

398+
/**
399+
* Explicitly control the Elasticsearch refresh behavior for write ops.
400+
* Accepts: true, false, or 'wait_for'.
401+
*/
402+
public function withRefresh(bool|string $refresh): static
403+
{
404+
$this->query->options()->add('refresh', $refresh);
405+
406+
return $this;
407+
}
408+
397409
/**
398410
* @throws DynamicIndexException
399411
*/
@@ -644,6 +656,27 @@ public function rawDsl($dsl): array
644656
return $this->query->raw($dsl)->asArray();
645657
}
646658

659+
/**
660+
* Force insert operations to use op_type=create for dedupe semantics.
661+
* When set, attempts to create an existing _id will fail with a 409 from Elasticsearch.
662+
*/
663+
public function createOnly(): static
664+
{
665+
// mark insert op type on the underlying query options
666+
$this->query->options()->add('insert_op_type', 'create');
667+
668+
return $this;
669+
}
670+
671+
/**
672+
* Convenience method to perform a create-only insert and surface 409s as exceptions.
673+
* Accepts single document attributes or an array of documents.
674+
*/
675+
public function createOrFail(array $attributes)
676+
{
677+
return $this->createOnly()->create($attributes);
678+
}
679+
647680
// ----------------------------------------------------------------------
648681
// Protected
649682
// ----------------------------------------------------------------------

src/Eloquent/Docs/ModelDocs.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,8 @@
181181
* @method static array getModels($columns = ['*'])
182182
* @method static ElasticCollection get($columns = ['*'])
183183
* @method static ElasticCollection insert($values, $returnData = null)
184+
* @method static self createOnly()
185+
* @method static self createOrFail(array $attributes)
184186
*-----------------------------------
185187
* @method static array toDsl($columns = ['*'])
186188
* @method static array toSql($columns = ['*'])

src/Exceptions/BulkInsertQueryException.php

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ class BulkInsertQueryException extends LaravelElasticsearchException
1515
*/
1616
public function __construct(Elasticsearch $queryResult)
1717
{
18-
parent::__construct($this->formatMessage($queryResult->asArray()), 400);
18+
$result = $queryResult->asArray();
19+
parent::__construct($this->formatMessage($result), $this->inferStatusCode($result));
1920
}
2021

2122
/**
@@ -30,12 +31,16 @@ private function formatMessage(array $result): string
3031
// Clean that ish up.
3132
$items = collect($result['items'] ?? [])
3233
->filter(function (array $item) {
33-
return $item['index'] && ! empty($item['index']['error']);
34+
$action = array_key_first($item) ?? 'index';
35+
36+
return isset($item[$action]) && ! empty($item[$action]['error']);
3437
})
3538
->map(function (array $item) {
36-
return $item['index'];
39+
$action = array_key_first($item) ?? 'index';
40+
41+
return $item[$action];
3742
})
38-
// reduce to max limit
43+
// reduce to max limit
3944
->slice(0, $this->errorLimit)
4045
->values();
4146

@@ -44,11 +49,28 @@ private function formatMessage(array $result): string
4449
$message->push('Bulk Insert Errors ('.'Showing '.$items->count().' of '.$totalErrors->count().'):');
4550

4651
$items = $items->map(function (array $item) {
47-
return "{$item['_id']}: {$item['error']['reason']}";
52+
$id = $item['_id'] ?? 'unknown';
53+
$reason = $item['error']['reason'] ?? 'unknown error';
54+
$type = $item['error']['type'] ?? 'error';
55+
56+
return "$id: [$type] $reason";
4857
})->values()->toArray();
4958

5059
$message->push(...$items);
5160

5261
return $message->implode(PHP_EOL);
5362
}
63+
64+
private function inferStatusCode(array $result): int
65+
{
66+
foreach ($result['items'] ?? [] as $item) {
67+
$action = array_key_first($item) ?? 'index';
68+
$error = $item[$action]['error'] ?? null;
69+
if (is_array($error) && ($error['type'] ?? '') === 'version_conflict_engine_exception') {
70+
return 409;
71+
}
72+
}
73+
74+
return 400;
75+
}
5476
}

src/Query/DSL/DslBuilder.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,9 @@ public function setFields(array $fields): self
107107

108108
/**
109109
* Set a refresh parameter
110+
* Accepts: true, false, or 'wait_for'
110111
*/
111-
public function setRefresh(bool $refresh = true): self
112+
public function setRefresh(bool|string $refresh = true): self
112113
{
113114
return $this->set(['refresh'], $refresh);
114115
}

src/Query/DSL/DslFactory.php

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,17 @@ public static function indexOperation(string $index, mixed $id = null, array $op
1919
return ['index' => $operation];
2020
}
2121

22+
public static function createOperation(string $index, mixed $id = null, array $options = []): array
23+
{
24+
$operation = array_merge(['_index' => $index], $options);
25+
26+
if ($id !== null) {
27+
$operation['_id'] = $id;
28+
}
29+
30+
return ['create' => $operation];
31+
}
32+
2233
// ----------------------------------------------------------------------
2334
// Query
2435
// ----------------------------------------------------------------------

src/Query/Grammar.php

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public function compileInsert($query, array $values): array
6666

6767
// Prepare main document operation options
6868
$options = [];
69+
$opType = null;
6970

7071
// Handle routing
7172
if (isset($doc['_routing'])) {
@@ -83,6 +84,18 @@ public function compileInsert($query, array $values): array
8384
unset($doc['_parent']);
8485
}
8586

87+
// Respect explicit op_type selection from document
88+
if (isset($doc['_op_type'])) {
89+
$opType = $doc['_op_type'];
90+
unset($doc['_op_type']);
91+
} elseif (isset($doc['op_type'])) {
92+
$opType = $doc['op_type'];
93+
unset($doc['op_type']);
94+
} else {
95+
// Also allow query option to drive op type
96+
$opType = $query->getOption('insert_op_type', null);
97+
}
98+
8699
// We don't want to save the ID as part of the doc
87100
// Unless the Model has explicitly set 'storeIdsInDocument'
88101
if ($query->getOption('store_ids_in_document', false)) {
@@ -92,12 +105,20 @@ public function compileInsert($query, array $values): array
92105
unset($doc['id'], $doc['_id']);
93106
}
94107

95-
// Add the document index operation
96-
$index = DslFactory::indexOperation(
97-
index: $query->getFrom(),
98-
id: $docId,
99-
options: $options
100-
);
108+
// Add the document operation (index or create)
109+
if ($opType && strtolower((string) $opType) === 'create') {
110+
$index = DslFactory::createOperation(
111+
index: $query->getFrom(),
112+
id: $docId,
113+
options: $options
114+
);
115+
} else {
116+
$index = DslFactory::indexOperation(
117+
index: $query->getFrom(),
118+
id: $docId,
119+
options: $options
120+
);
121+
}
101122
$dsl->appendBody($index);
102123

103124
// Process document properties to ensure proper formatting

src/Query/Processor.php

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -446,16 +446,17 @@ public function processBulkInsert(Builder $query, Elasticsearch $result): array
446446
];
447447
if (! empty($process['items'])) {
448448
foreach ($process['items'] as $item) {
449-
if (! empty($item['index']['error'])) {
449+
$action = array_key_first($item) ?? 'index';
450+
if (! empty($item[$action]['error'])) {
450451
$outcome['errors'][] = [
451-
'id' => $item['index']['_id'],
452-
'type' => $item['index']['error']['type'],
453-
'reason' => $item['index']['error']['reason'],
452+
'id' => $item[$action]['_id'] ?? null,
453+
'type' => $item[$action]['error']['type'] ?? null,
454+
'reason' => $item[$action]['error']['reason'] ?? null,
454455
];
455456
$outcome['failed']++;
456457
} else {
457458
$outcome['success']++;
458-
if ($item['index']['status'] == 201) {
459+
if (($item[$action]['status'] ?? 200) == 201) {
459460
$outcome['created']++;
460461
} else {
461462
$outcome['modified']++;

tests/CreateOpTypeTest.php

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
use PDPhilip\Elasticsearch\Exceptions\BulkInsertQueryException;
6+
use PDPhilip\Elasticsearch\Tests\Models\User;
7+
8+
beforeEach(function () {
9+
User::executeSchema();
10+
});
11+
12+
it('creates a document with createOnly and rejects duplicates', function () {
13+
$id = 'dataset:check-1:2025-01-01T00:00:00Z';
14+
15+
// First create should succeed
16+
User::query()
17+
->createOnly()
18+
->withRefresh('wait_for')
19+
->create([
20+
'id' => $id,
21+
'name' => 'First Insert',
22+
'title' => 'admin',
23+
'age' => 30,
24+
]);
25+
26+
$found = User::find($id);
27+
expect($found)->not()->toBeNull();
28+
expect($found->id)->toBe($id);
29+
30+
// Second create with same _id must fail with 409 (bulk error)
31+
expect(function () use ($id) {
32+
User::query()
33+
->createOnly()
34+
->create([
35+
'id' => $id,
36+
'name' => 'Second Insert',
37+
'title' => 'user',
38+
'age' => 31,
39+
]);
40+
})->toThrow(BulkInsertQueryException::class);
41+
});
42+
43+
it('supports per-document op_type via attribute', function () {
44+
$id = 'dataset:check-2:2025-01-01T00:00:00Z';
45+
46+
// Create with per-document op_type
47+
User::create([
48+
'id' => $id,
49+
'_op_type' => 'create',
50+
'name' => 'Doc Create',
51+
'title' => 'admin',
52+
'age' => 42,
53+
]);
54+
55+
$found = User::find($id);
56+
expect($found)->not()->toBeNull();
57+
expect($found->id)->toBe($id);
58+
59+
// Duplicate should raise conflict
60+
expect(function () use ($id) {
61+
User::create([
62+
'id' => $id,
63+
'_op_type' => 'create',
64+
'name' => 'Doc Create Duplicate',
65+
]);
66+
})->toThrow(BulkInsertQueryException::class);
67+
});

0 commit comments

Comments
 (0)