diff --git a/database-plugins/src/main/java/io/cdap/plugin/db/batch/source/DBSource.java b/database-plugins/src/main/java/io/cdap/plugin/db/batch/source/DBSource.java
index 3f487e92d..787a65527 100644
--- a/database-plugins/src/main/java/io/cdap/plugin/db/batch/source/DBSource.java
+++ b/database-plugins/src/main/java/io/cdap/plugin/db/batch/source/DBSource.java
@@ -41,6 +41,7 @@
 import io.cdap.plugin.DBRecord;
 import io.cdap.plugin.FieldCase;
 import io.cdap.plugin.StructuredRecordUtils;
+import io.cdap.plugin.common.Asset;
 import io.cdap.plugin.common.LineageRecorder;
 import io.cdap.plugin.common.ReferenceBatchSource;
 import io.cdap.plugin.common.ReferencePluginConfig;
@@ -49,6 +50,7 @@
 import io.cdap.plugin.common.db.DriverCleanup;
 import io.cdap.plugin.db.batch.TransactionIsolationLevel;
 import io.cdap.plugin.db.common.DBBaseConfig;
+import io.cdap.plugin.db.common.DBURLParser;
 import io.cdap.plugin.db.connector.DBConnector;
 import io.cdap.plugin.db.connector.DBConnectorConfig;
 import org.apache.hadoop.conf.Configuration;
@@ -59,6 +61,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.URI;
 import java.sql.Connection;
 import java.sql.Driver;
 import java.sql.DriverManager;
@@ -270,6 +273,15 @@ private Connection getConnection() throws SQLException {
     return DriverManager.getConnection(sourceConfig.getConnectionString(), properties);
   }
 
+  protected LineageRecorder getLineageRecorder(BatchSourceContext context) {
+    // dbtype, host, port, db from the connection string
+    // table is the reference name
+    URI uri = DBURLParser.parseURL(sourceConfig.getConnectionString());
+    String fqn = DBURLParser.constructFQN(uri, sourceConfig.getReferenceName());
+    Asset asset = Asset.builder(sourceConfig.getReferenceName()).setFqn(fqn).build();
+    return new LineageRecorder(context, asset);
+  }
+
   /**
    * {@link PluginConfig} for {@link DBSource}
    */
diff --git a/database-plugins/src/main/java/io/cdap/plugin/db/common/DBURLParser.java b/database-plugins/src/main/java/io/cdap/plugin/db/common/DBURLParser.java
new file mode 100644
index 000000000..6853d04f7
--- /dev/null
+++ b/database-plugins/src/main/java/io/cdap/plugin/db/common/DBURLParser.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright © 2022 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.db.common;
+
+import java.net.URI;
+
+/**
+ * URL Parser for DB
+ */
+
+public class DBURLParser {
+  public static URI parseURL(String connectionString) {
+    // Remove the 'jdbc:' prefix from the connection string
+    String cleanURI = connectionString.substring(5);
+    URI uri = URI.create(cleanURI);
+    return uri;
+  }
+
+  public static String constructFQN(URI uri, String tableName) {
+    if (uri.getScheme() == "postgres") {
+      return "";
+    } else {
+      return String.format("%s://%s:%s/%s/%s", uri.getScheme(), uri.getHost(), uri.getPort(), uri.getPath(), tableName);
+    }
+  }
+}