Skip to content

Commit

Permalink
Merge pull request #35 from DS4SD/add-target-options
Browse files Browse the repository at this point in the history
Add export target options for document conversion
  • Loading branch information
dolfim-ibm authored Aug 25, 2022
2 parents 76f4b54 + 9278489 commit 338de56
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 153 deletions.
38 changes: 28 additions & 10 deletions deepsearch/documents/core/convert.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import glob
import logging
import os
import pathlib
from enum import Enum
from pathlib import Path
from typing import Any, List, Optional
from typing import Any, Dict, List, Literal, Optional

import requests
import urllib3
from pydantic import BaseModel, Field
from tqdm import tqdm

from deepsearch.cps.apis import public as sw_client
Expand All @@ -14,16 +18,24 @@
from deepsearch.cps.client.api import CpsApi

from .common_routines import ERROR_MSG, progressbar
from .models import ExportTarget, ZipTarget
from .utils import URLNavigator, collect_all_local_files, download_url

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
logger = logging.getLogger(__name__)


def make_payload(url_document: str, collection_name: str = "_default"):
def make_payload(
url_document: str,
target: Optional[ExportTarget],
collection_name: str = "_default",
):
"""
Create payload for requesting conversion
"""

target = target or ZipTarget()

payload = {
"source": {
"type": "url",
Expand All @@ -33,7 +45,7 @@ def make_payload(url_document: str, collection_name: str = "_default"):
"collection_name": collection_name,
"keep_documents": "false",
},
"target": {"type": "zip", "content_type": "json", "add_cells": "true"},
"target": target.dict(),
}
return payload

Expand Down Expand Up @@ -65,9 +77,7 @@ def get_ccs_project_key(api: CpsApi, cps_proj_key: str):


def submit_url_for_conversion(
api: CpsApi,
cps_proj_key: str,
url: str,
api: CpsApi, cps_proj_key: str, url: str, target: Optional[ExportTarget]
) -> str:
"""
Convert an online pdf using DeepSearch Technology.
Expand All @@ -77,7 +87,7 @@ def submit_url_for_conversion(
api=api, cps_proj_key=cps_proj_key
)
# submit conversion request
payload = make_payload(url, collection_name)
payload = make_payload(url, target, collection_name)

try:
request_conversion_task_id = api.client.session.post(
Expand All @@ -99,6 +109,7 @@ def send_files_for_conversion(
api: CpsApi,
cps_proj_key: str,
source_path: Path,
target: Optional[ExportTarget],
root_dir: Path,
progress_bar=False,
) -> list:
Expand Down Expand Up @@ -126,7 +137,10 @@ def send_files_for_conversion(
)
# submit url for conversion
task_id = submit_url_for_conversion(
api=api, cps_proj_key=cps_proj_key, url=private_download_url
api=api,
cps_proj_key=cps_proj_key,
url=private_download_url,
target=target,
)
task_ids.append(task_id)
progress.update(1)
Expand Down Expand Up @@ -255,7 +269,11 @@ def upload_single_file(api: CpsApi, cps_proj_key: str, source_path: Path) -> str


def send_urls_for_conversion(
api: CpsApi, cps_proj_key: str, urls: List[str], progress_bar=False
api: CpsApi,
cps_proj_key: str,
urls: List[str],
target: Optional[ExportTarget],
progress_bar=False,
) -> List[Any]:
"""
Send multiple online documents for conversion.
Expand All @@ -271,7 +289,7 @@ def send_urls_for_conversion(
) as progress:
for url in urls:
task_id = submit_url_for_conversion(
api=api, cps_proj_key=cps_proj_key, url=url
api=api, cps_proj_key=cps_proj_key, url=url, target=target
)
task_ids.append(task_id)
progress.update(1)
Expand Down
22 changes: 18 additions & 4 deletions deepsearch/documents/core/input_process.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import os
import tempfile
from pathlib import Path
from typing import List
from typing import List, Optional

import urllib3

from deepsearch.cps.client.api import CpsApi
from deepsearch.cps.client.components.documents import DocumentConversionResult

from .models import ExportTarget
from .utils import batch_single_files

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
Expand All @@ -20,7 +21,11 @@


def process_local_input(
api: CpsApi, cps_proj_key: str, source_path: Path, progress_bar=False
api: CpsApi,
cps_proj_key: str,
source_path: Path,
target: Optional[ExportTarget],
progress_bar=False,
) -> DocumentConversionResult:
"""
Classify the user provided local input and take appropriate action.
Expand All @@ -38,6 +43,7 @@ def process_local_input(
api=api,
cps_proj_key=cps_proj_key,
source_path=source_path,
target=target,
root_dir=Path(tmpdir),
progress_bar=progress_bar,
)
Expand All @@ -58,13 +64,21 @@ def process_local_input(


def process_urls_input(
api: CpsApi, cps_proj_key: str, urls: List[str], progress_bar=False
api: CpsApi,
cps_proj_key: str,
urls: List[str],
target: Optional[ExportTarget],
progress_bar=False,
):
"""
Classify user provided url(s) and take appropriate action.
"""
task_ids = send_urls_for_conversion(
api=api, cps_proj_key=cps_proj_key, urls=urls, progress_bar=progress_bar
api=api,
cps_proj_key=cps_proj_key,
urls=urls,
target=target,
progress_bar=progress_bar,
)
statuses = check_status_running_tasks(
api=api, cps_proj_key=cps_proj_key, task_ids=task_ids, progress_bar=progress_bar
Expand Down
8 changes: 8 additions & 0 deletions deepsearch/documents/core/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
process_local_input,
process_urls_input,
)
from deepsearch.documents.core.models import ExportTarget

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

Expand All @@ -17,6 +18,7 @@ def convert_documents(
api: CpsApi,
urls: Optional[Union[str, List[str]]] = None,
source_path: Optional[Path] = None,
target: Optional[ExportTarget] = None,
progress_bar=False,
):
"""
Expand All @@ -34,6 +36,10 @@ def convert_documents(
For converting local files, please provide absolute path to file or to directory
containing multiple files.
target : deepsearch.documents.core.models.ExportTargets [OPTIONAL]
Specify to which target the documents should be exported. Available options: ZIP file,
Elastic index, MongoDB collection
progress_bar : Boolean (default is False in code, True in CLI)
Show progress bar for processing, submitting, converting input and
downloading converted document.
Expand All @@ -53,13 +59,15 @@ def convert_documents(
api=api,
cps_proj_key=proj_key,
urls=urls,
target=target,
progress_bar=progress_bar,
)
elif urls is None and source_path is not None:
return process_local_input(
api=api,
cps_proj_key=proj_key,
source_path=Path(source_path).expanduser().resolve(),
target=target,
progress_bar=progress_bar,
)

Expand Down
117 changes: 117 additions & 0 deletions deepsearch/documents/core/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from enum import Enum
from typing import List, Literal, Optional, Set, Union

from pydantic import BaseModel, Field


class S3Coordinates(BaseModel):
host: str
port: int

ssl: bool
verify_ssl: bool

access_key: str
secret_key: str

bucket: str
location: str
key_prefix: str = ""

external_endpoint: Optional[str] = None


class DocumentExistsInTargetAction(str, Enum):
"""
What to do if the document already exists on the target.
- `replace` will replace the document, destroying any external modifications.
- `merge` will try to merge the updated contents with the already-present document.
- `skip` will not touch the document on the target, leaving it as-is.
Using `skip` will incur in a performance increase, however, if the document
is modified externally, CCS will not update it back to the original state.
"""

REPLACE = "replace"
MERGE = "merge"
SKIP = "skip"


class MongoCollectionCoordinates(BaseModel):
uri: str
database: str
collection: str


class MongoS3TargetCoordinates(BaseModel):
"""Coordinates to a Mongo collection, and optionally, an S3 bucket"""

mongo: MongoCollectionCoordinates
s3: Optional[S3Coordinates]


class MongoS3Target(BaseModel):
type: Literal["mongo_s3"] = "mongo_s3"

# Coordinates for the export
coordinates: MongoS3TargetCoordinates

if_document_exists: DocumentExistsInTargetAction = (
DocumentExistsInTargetAction.MERGE
)


class ZipPackageContentType(str, Enum):
"""Specify the content type for the documents in the Zip file."""

JSON = "json"
HTML = "html"


class ZipTarget(BaseModel):
"""
Specify how the documents should be exported to a Zip file.
If the [coordinates] are not specified, the project's coordinates
will be used.
"""

type: Literal["zip"] = "zip"

content_type: ZipPackageContentType = ZipPackageContentType.JSON

add_cells: bool = True


class ElasticIndexCoordinates(BaseModel):
hosts: List[str]
dangerously_disable_ssl_validation: bool = False
ca_certificate_base64: Optional[str] = None
index: str


class ElasticS3TargetCoordinates(BaseModel):
elastic: ElasticIndexCoordinates
s3: Optional[S3Coordinates]


class ElasticS3Target(BaseModel):
type: Literal["elastic_s3"] = "elastic_s3"

coordinates: ElasticS3TargetCoordinates

if_document_exists: DocumentExistsInTargetAction = (
DocumentExistsInTargetAction.MERGE
)

escape_ref_fields: bool = Field(
default=True,
description="If true, `$ref` fields are renamed to `__ref`. This allows the data to then be written into a MongoDB collection.",
)


ExportTarget = Union[
ZipTarget,
MongoS3Target,
ElasticS3Target,
]
Loading

0 comments on commit 338de56

Please sign in to comment.