diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/DataSourceConfig.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/DataSourceConfig.java index 723a4a6d..6ee5252d 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/config/DataSourceConfig.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/config/DataSourceConfig.java @@ -4,9 +4,11 @@ import org.springframework.batch.core.repository.JobRepository; import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean; import org.springframework.beans.factory.annotation.Value; +import org.springframework.cloud.client.loadbalancer.LoadBalanced; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.web.client.RestTemplate; import javax.sql.DataSource; @@ -31,5 +33,12 @@ public JobRepository roachRepository(DataSource dataSource, PlatformTransactionM factory.setMaxVarCharLength(1000); return factory.getObject(); } + + @LoadBalanced + @Bean + public RestTemplate credentialTemplate(){ + RestTemplate credenTemplate = new RestTemplate(); + return credenTemplate; + } } diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java index e465a825..87ed892c 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/constant/ODSConstants.java @@ -17,6 +17,10 @@ public class ODSConstants { public static final String FILE_SIZE = "fileSize"; public static final String FILE_PATH = "filePath"; public static final String FILE_ID = "file_id"; + + public static final String DROPBOX= "dropbox"; + public static final String GOOGLEDRIVE = "gdrive"; + public static final String BOX= "box"; public static final String TIME = "time"; public static final String SOURCE_ACCOUNT_ID_PASS = "sourceAccountIdPass"; public static final String SOURCE_HOST = "sourceURI"; diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/credential/EndpointCredential.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/credential/EndpointCredential.java index 3a029fdb..fd3b54fe 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/model/credential/EndpointCredential.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/model/credential/EndpointCredential.java @@ -1,8 +1,12 @@ package org.onedatashare.transferservice.odstransferservice.model.credential; +import lombok.Getter; + /** * Base class for storing one user credential */ + +@Getter public class EndpointCredential{ protected String accountId; diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/AuthenticateCredentials.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/AuthenticateCredentials.java new file mode 100644 index 00000000..82c2788c --- /dev/null +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/AuthenticateCredentials.java @@ -0,0 +1,22 @@ +package org.onedatashare.transferservice.odstransferservice.service; + +import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential; +import org.springframework.stereotype.Service; +import org.springframework.web.client.RestTemplate; + + +@Service +public class AuthenticateCredentials { + RestTemplate credentialTemplate; + String baseUrl = "http://EndPointCredentialService/{}/{}/{}"; + + public AuthenticateCredentials(RestTemplate credentialTemplate){ + this.credentialTemplate = credentialTemplate; + } + public OAuthEndpointCredential checkExpiryAndGenerateNew(String userId, String type, String accountId){ + String endpoint = baseUrl.replaceFirst("\\{}", userId) + .replaceFirst("\\{}", type) + .replaceFirst("\\{}", accountId); + return this.credentialTemplate.getForObject(endpoint, OAuthEndpointCredential.class); + } +} diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java index 13d6092c..33dcefe1 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/JobControl.java @@ -105,6 +105,9 @@ public class JobControl extends DefaultBatchConfigurer { @Autowired JobRepository roachRepository; + @Autowired + AuthenticateCredentials authenticateCredentials; + @Lazy @Bean @@ -161,18 +164,18 @@ protected AbstractItemCountingItemStreamItemReader getRightReader(End amazonS3Reader.setPool(connectionBag.getS3ReaderPool()); return amazonS3Reader; case box: - BoxReader boxReader = new BoxReader(request.getSource().getOauthSourceCredential(), fileInfo); + BoxReader boxReader = new BoxReader(request.getSource().getOauthSourceCredential(), fileInfo,this.authenticateCredentials); boxReader.setMaxRetry(this.request.getOptions().getRetry()); return boxReader; case dropbox: - DropBoxReader dropBoxReader = new DropBoxReader(request.getSource().getOauthSourceCredential(), fileInfo); + DropBoxReader dropBoxReader = new DropBoxReader(request.getSource().getOauthSourceCredential(), fileInfo, this.authenticateCredentials); return dropBoxReader; case scp: SCPReader reader = new SCPReader(fileInfo); reader.setPool(connectionBag.getSftpReaderPool()); return reader; case gdrive: - GDriveReader dDriveReader = new GDriveReader(request.getSource().getOauthSourceCredential(), fileInfo); + GDriveReader dDriveReader = new GDriveReader(request.getSource().getOauthSourceCredential(), fileInfo, this.authenticateCredentials); return dDriveReader; } return null; @@ -208,11 +211,11 @@ protected ItemWriter getRightWriter(EndpointType type, EntityInfo fil BoxWriterSmallFile boxWriterSmallFile = new BoxWriterSmallFile(request.getDestination().getOauthDestCredential(), fileInfo, this.metricsCollector, this.influxCache); return boxWriterSmallFile; } else { - BoxWriterLargeFile boxWriterLargeFile = new BoxWriterLargeFile(request.getDestination().getOauthDestCredential(), fileInfo, this.metricsCollector, this.influxCache); + BoxWriterLargeFile boxWriterLargeFile = new BoxWriterLargeFile(request.getDestination().getOauthDestCredential(), fileInfo, this.metricsCollector, this.influxCache, this.authenticateCredentials); return boxWriterLargeFile; } case dropbox: - DropBoxChunkedWriter dropBoxChunkedWriter = new DropBoxChunkedWriter(request.getDestination().getOauthDestCredential(), this.metricsCollector, this.influxCache); + DropBoxChunkedWriter dropBoxChunkedWriter = new DropBoxChunkedWriter(request.getDestination().getOauthDestCredential(), this.metricsCollector, this.influxCache, this.authenticateCredentials); return dropBoxChunkedWriter; case scp: SCPWriter scpWriter = new SCPWriter(fileInfo, this.metricsCollector, this.influxCache); @@ -223,7 +226,7 @@ protected ItemWriter getRightWriter(EndpointType type, EntityInfo fil GDriveSimpleWriter writer = new GDriveSimpleWriter(request.getDestination().getOauthDestCredential(),fileInfo); return writer; }else{ - GDriveResumableWriter writer = new GDriveResumableWriter(request.getDestination().getOauthDestCredential(),fileInfo); + GDriveResumableWriter writer = new GDriveResumableWriter(request.getDestination().getOauthDestCredential(),fileInfo, this.authenticateCredentials); writer.setPool(connectionBag.getGoogleDriveWriterPool()); return writer; } diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/box/BoxReader.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/box/BoxReader.java index ae58c6dc..64e8ace7 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/box/BoxReader.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/box/BoxReader.java @@ -2,10 +2,13 @@ import com.box.sdk.BoxAPIConnection; import com.box.sdk.BoxFile; +import com.box.sdk.BoxUser; +import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants; import org.onedatashare.transferservice.odstransferservice.model.DataChunk; import org.onedatashare.transferservice.odstransferservice.model.EntityInfo; import org.onedatashare.transferservice.odstransferservice.model.FilePart; import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential; +import org.onedatashare.transferservice.odstransferservice.service.AuthenticateCredentials; import org.onedatashare.transferservice.odstransferservice.service.FilePartitioner; import org.onedatashare.transferservice.odstransferservice.utility.ODSUtility; import org.slf4j.Logger; @@ -17,6 +20,8 @@ import java.io.ByteArrayOutputStream; +import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.OWNER_ID; + public class BoxReader extends AbstractItemCountingItemStreamItemReader { private OAuthEndpointCredential credential; @@ -27,14 +32,24 @@ public class BoxReader extends AbstractItemCountingItemStreamItemReader { - private final OAuthEndpointCredential credential; + private OAuthEndpointCredential credential; private BoxAPIConnection boxAPIConnection; EntityInfo fileInfo; private HashMap fileMap; @@ -42,8 +46,11 @@ public class BoxWriterLargeFile extends ODSBaseWriter implements ItemWriter(); this.parts = new ArrayList<>(); this.credential = oAuthDestCredential; + this.authenticateCredentials = authenticateCredentials; + BoxUser user = new BoxUser(this.boxAPIConnection, this.credential.getAccountId()); + BoxUser.Info userInfo = user.getInfo("login"); + this.usersEmail = userInfo.getLogin(); } @BeforeStep @@ -58,6 +69,7 @@ public void beforeStep(StepExecution stepExecution) { this.destinationBasePath = stepExecution.getJobParameters().getString(DEST_BASE_PATH); //path to place the files this.boxFolder = new BoxFolder(this.boxAPIConnection, this.destinationBasePath); this.stepExecution = stepExecution; + this.ownerId = stepExecution.getJobParameters().getString(OWNER_ID); } /** @@ -111,6 +123,10 @@ private boolean ready(String fileName) { */ @Override public void write(List items) throws NoSuchAlgorithmException { + if(this.credential.isTokenExpires()){ + this.credential = authenticateCredentials.checkExpiryAndGenerateNew( usersEmail, ODSConstants.BOX, ownerId); + this.boxAPIConnection = new BoxAPIConnection(this.credential.getToken()); + } String fileName = items.get(0).getFileName(); prepareForUpload(fileName); BoxFileUploadSession session = this.fileMap.get(fileName); diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/dropbox/DropBoxChunkedWriter.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/dropbox/DropBoxChunkedWriter.java index bcd5f546..d895e7e1 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/dropbox/DropBoxChunkedWriter.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/dropbox/DropBoxChunkedWriter.java @@ -1,13 +1,17 @@ package org.onedatashare.transferservice.odstransferservice.service.step.dropbox; +import com.box.sdk.BoxUser; import com.dropbox.core.DbxException; import com.dropbox.core.v2.DbxClientV2; import com.dropbox.core.v2.files.CommitInfo; import com.dropbox.core.v2.files.FileMetadata; import com.dropbox.core.v2.files.UploadSessionCursor; +import com.dropbox.core.v2.users.FullAccount; import lombok.SneakyThrows; +import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants; import org.onedatashare.transferservice.odstransferservice.model.DataChunk; import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential; +import org.onedatashare.transferservice.odstransferservice.service.AuthenticateCredentials; import org.onedatashare.transferservice.odstransferservice.service.InfluxCache; import org.onedatashare.transferservice.odstransferservice.service.cron.MetricsCollector; import org.onedatashare.transferservice.odstransferservice.service.step.ODSBaseWriter; @@ -25,10 +29,11 @@ import java.util.List; import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.DEST_BASE_PATH; +import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.OWNER_ID; public class DropBoxChunkedWriter extends ODSBaseWriter implements ItemWriter { - private final OAuthEndpointCredential credential; + private OAuthEndpointCredential credential; private String destinationPath; private DbxClientV2 client; String sessionId; @@ -37,10 +42,16 @@ public class DropBoxChunkedWriter extends ODSBaseWriter implements ItemWriter items) throws Exception { + if(this.credential.isTokenExpires()){ + this.credential = authenticateCredentials.checkExpiryAndGenerateNew( usersEmail, ODSConstants.DROPBOX, ownerId); + this.client = new DbxClientV2(ODSUtility.dbxRequestConfig, this.credential.getToken()); + } for(DataChunk chunk : items){ this.client.files().uploadSessionAppendV2(cursor).uploadAndFinish(new ByteArrayInputStream(chunk.getData())); this.cursor = new UploadSessionCursor(sessionId, chunk.getStartPosition() + chunk.getSize()); diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/dropbox/DropBoxReader.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/dropbox/DropBoxReader.java index 519826d5..6f4855cc 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/dropbox/DropBoxReader.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/dropbox/DropBoxReader.java @@ -4,10 +4,12 @@ import com.dropbox.core.v2.DbxClientV2; import com.dropbox.core.v2.files.DownloadBuilder; import com.dropbox.core.v2.files.Metadata; +import org.onedatashare.transferservice.odstransferservice.constant.ODSConstants; import org.onedatashare.transferservice.odstransferservice.model.DataChunk; import org.onedatashare.transferservice.odstransferservice.model.EntityInfo; import org.onedatashare.transferservice.odstransferservice.model.FilePart; import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential; +import org.onedatashare.transferservice.odstransferservice.service.AuthenticateCredentials; import org.onedatashare.transferservice.odstransferservice.service.FilePartitioner; import org.onedatashare.transferservice.odstransferservice.utility.ODSUtility; import org.slf4j.Logger; @@ -20,13 +22,14 @@ import java.io.ByteArrayOutputStream; import java.nio.file.Paths; +import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.OWNER_ID; import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.SOURCE_BASE_PATH; public class DropBoxReader extends AbstractItemCountingItemStreamItemReader { Logger logger = LoggerFactory.getLogger(DropBoxReader.class); private final EntityInfo fileInfo; - private final OAuthEndpointCredential credential; + private OAuthEndpointCredential credential; private String sBasePath; private FilePartitioner partitioner; @@ -34,12 +37,18 @@ public class DropBoxReader extends AbstractItemCountingItemStreamItemReader { Logger logger = LoggerFactory.getLogger(GDriveReader.class); private final EntityInfo fileInfo; - private final OAuthEndpointCredential credential; + private OAuthEndpointCredential credential; private final FilePartitioner partitioner; private Drive client; private String fileName; private File file; - public GDriveReader(OAuthEndpointCredential credential, EntityInfo fileInfo){ + String usersEmail; + String ownerId; + + private AuthenticateCredentials authenticateCredentials; + + public GDriveReader(OAuthEndpointCredential credential, EntityInfo fileInfo,AuthenticateCredentials authenticateCredentials){ this.credential = credential; this.fileInfo = fileInfo; this.partitioner = new FilePartitioner(fileInfo.getChunkSize()); this.setName(ClassUtils.getShortName(GDriveReader.class)); logger.info(credential.toString()); + this.authenticateCredentials = authenticateCredentials; } @Override protected DataChunk doRead() throws Exception { + if(this.credential.isTokenExpires()){ + this.credential = authenticateCredentials.checkExpiryAndGenerateNew( usersEmail, ODSConstants.GOOGLEDRIVE, ownerId); + doOpen(); + } FilePart filePart = this.partitioner.nextPart(); if(filePart == null || filePart.getSize() == 0) return null; Drive.Files.Get get = this.client.files().get(fileInfo.getId()); @@ -55,6 +73,13 @@ protected void doOpen() throws Exception { this.fileName = file.getName(); this.partitioner.createParts(this.fileInfo.getSize(), this.fileName); logger.info(file.toString()); + About about = client.about().get().execute(); + this.usersEmail = about.getUser().getEmailAddress(); + } + + @BeforeStep + public void beforeStep(StepExecution stepExecution){ + this.ownerId = stepExecution.getJobParameters().getString(OWNER_ID); } @Override diff --git a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/googleDrive/GDriveResumableWriter.java b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/googleDrive/GDriveResumableWriter.java index ec59c9ff..2518c4cb 100644 --- a/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/googleDrive/GDriveResumableWriter.java +++ b/src/main/java/org/onedatashare/transferservice/odstransferservice/service/step/googleDrive/GDriveResumableWriter.java @@ -1,6 +1,7 @@ package org.onedatashare.transferservice.odstransferservice.service.step.googleDrive; import com.google.api.services.drive.Drive; +import com.google.api.services.drive.model.About; import com.google.api.services.drive.model.File; import org.apache.commons.pool2.ObjectPool; import org.codehaus.jettison.json.JSONException; @@ -11,6 +12,7 @@ import org.onedatashare.transferservice.odstransferservice.model.credential.OAuthEndpointCredential; import org.onedatashare.transferservice.odstransferservice.pools.GDriveConnectionPool; import org.onedatashare.transferservice.odstransferservice.pools.HttpConnectionPool; +import org.onedatashare.transferservice.odstransferservice.service.AuthenticateCredentials; import org.onedatashare.transferservice.odstransferservice.utility.GDriveHelper; import org.onedatashare.transferservice.odstransferservice.utility.ODSUtility; import org.slf4j.Logger; @@ -36,12 +38,13 @@ import java.util.*; import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.GOOGLE_DRIVE_MIN_BYTES; +import static org.onedatashare.transferservice.odstransferservice.constant.ODSConstants.OWNER_ID; public class GDriveResumableWriter implements ItemWriter,SetPool { Logger logger = LoggerFactory.getLogger(GDriveResumableWriter.class); - private final OAuthEndpointCredential credential; + private OAuthEndpointCredential credential; private final EntityInfo fileInfo; private String basePath; private String fileName; @@ -50,12 +53,18 @@ public class GDriveResumableWriter implements ItemWriter,SetPool { private DataChunk readyChunk; + String usersEmail; + String ownerId; + + private AuthenticateCredentials authenticateCredentials; + private boolean failed; private boolean success; - public GDriveResumableWriter(OAuthEndpointCredential credential, EntityInfo fileInfo) { + public GDriveResumableWriter(OAuthEndpointCredential credential, EntityInfo fileInfo, AuthenticateCredentials authenticateCredentials) { this.credential = credential; this.fileInfo = fileInfo; + this.authenticateCredentials = authenticateCredentials; } @BeforeStep @@ -64,10 +73,11 @@ public void beforeStep(StepExecution stepExecution) throws IOException, GeneralS this.readyChunk = null; this.failed=false; this.success=false; + this.ownerId = stepExecution.getJobParameters().getString(OWNER_ID); } @BeforeWrite - public void beforeWrite(List items) throws IOException, JSONException, InterruptedException { + public void beforeWrite(List items) throws IOException, JSONException, InterruptedException, GeneralSecurityException { if(this.utility == null || this.utility.getSessionUri() == null){ this.logger.debug("Initializing resumable upload"); this.utility = GDriveHelper.builder() @@ -81,11 +91,21 @@ public void beforeWrite(List items) throws IOException, JSO throw new IOException("Unable to get the Location header from google drive. Error response code: "+status); } } + Drive client = ODSUtility.authenticateDriveClient(this.credential); + if(client!=null) { + About about = client.about().get().execute(); + if(about!= null) { + this.usersEmail = about.getUser().getEmailAddress(); + } + } } @Override public void write(List dataChunkList) { try { + if(this.credential.isTokenExpires()){ + this.credential = authenticateCredentials.checkExpiryAndGenerateNew( usersEmail, ODSConstants.GOOGLEDRIVE, ownerId); + } int chunkIndex = 0; while (chunkIndex < dataChunkList.size()) { DataChunk currentChunk = dataChunkList.get(chunkIndex);