Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .github/workflows/build-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,13 @@ jobs:
working-directory: ./sqrl-openai
run: |
docker run -i --rm -v $PWD:/build -e OPENAI_API_KEY="${{ secrets.OPENAI_API_KEY }}" datasqrl/cmd:latest test openai-cicd-test.sqrl --snapshot snapshots-openai-cicd-test

- name: Run SQRL Secure Test
working-directory: ./sqrl-secure
run: |
docker run -i --rm -v $PWD:/build datasqrl/cmd:latest test secure-cicd-test.sqrl --snapshot snapshots-secure-cicd-test

- name: Run SQRL Text Test
working-directory: ./sqrl-text
run: |
docker run -i --rm -v $PWD:/build datasqrl/cmd:latest test text-cicd-test.sqrl --snapshot snapshots-text-cicd-test
11 changes: 10 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
<name>sqrl-functions</name>
<url>http://www.datasqrl.com</url>
<modules>
<module>sqrl-lib-common</module>
<module>sqrl-math</module>
<module>sqrl-openai</module>
<module>sqrl-openai</module>
<module>sqrl-secure</module>
<module>sqrl-text</module>
</modules>

<properties>
Expand All @@ -27,6 +30,12 @@

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.datasqrl</groupId>
<artifactId>sqrl-lib-common</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
Expand Down
24 changes: 24 additions & 0 deletions sqrl-lib-common/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.datasqrl</groupId>
<artifactId>sqrl-functions</artifactId>
<version>0.5.0-SNAPSHOT</version>
</parent>

<artifactId>sqrl-lib-common</artifactId>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.datasqrl.datatype;

import lombok.SneakyThrows;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.InputGroup;
import org.apache.flink.table.functions.ScalarFunction;

/**
* Converts an annotated data type to
*/
public class SerializeToBytes extends ScalarFunction {

@SneakyThrows
public byte[] eval(@DataTypeHint(inputGroup = InputGroup.ANY) Object object) {
DataTypeHint hint = object.getClass().getAnnotation(DataTypeHint.class);
Class<? extends TypeSerializer> serializerClass = hint.rawSerializer();

TypeSerializer serializer = serializerClass.newInstance();

DataOutputSerializer dos = new DataOutputSerializer(128);

serializer.serialize(object, dos);

return dos.getCopyOfBuffer();
}
}
117 changes: 117 additions & 0 deletions sqrl-lib-common/src/main/java/com/datasqrl/function/FlinkTypeUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package com.datasqrl.function;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import lombok.Builder;
import lombok.Singular;
import lombok.SneakyThrows;
import lombok.Value;
import org.apache.flink.table.functions.FunctionDefinition;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.ArgumentCount;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.Signature;
import org.apache.flink.table.types.inference.TypeInference;
import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.flink.table.types.inference.utils.AdaptedCallContext;

public class FlinkTypeUtil {

public static TypeStrategy nullPreservingOutputStrategy(DataType outputType) {
return callContext -> {
DataType type = getFirstArgumentType(callContext);

if (type.getLogicalType().isNullable()) {
return Optional.of(outputType.nullable());
}

return Optional.of(outputType.notNull());
};
}

public static TypeInference basicNullInference(DataType outputType, DataType inputType) {
return TypeInference.newBuilder()
.typedArguments(inputType)
.outputTypeStrategy(nullPreservingOutputStrategy(outputType))
.build();
}

public static TypeInference.Builder basicNullInferenceBuilder(DataType outputType, DataType inputType) {
return TypeInference.newBuilder()
.typedArguments(inputType)
.outputTypeStrategy(nullPreservingOutputStrategy(outputType));
}

@SneakyThrows
public static DataType getFirstArgumentType(CallContext callContext) {
if (callContext instanceof AdaptedCallContext) {
Field privateField = AdaptedCallContext.class.getDeclaredField("originalContext");
privateField.setAccessible(true);
CallContext originalContext = (CallContext) privateField.get(callContext);

return originalContext
.getArgumentDataTypes()
.get(0);
} else {
return callContext.getArgumentDataTypes().get(0);
}
}

@Value
@Builder
public static class VariableArguments implements InputTypeStrategy {

@Singular
List<DataType> staticTypes;
DataType variableType;
int minVariableArguments;
int maxVariableArguments;

@Override
public ArgumentCount getArgumentCount() {
return new ArgumentCount() {
@Override
public boolean isValidCount(int count) {
int variableCount = count - staticTypes.size();
return variableCount >= minVariableArguments && variableCount <= maxVariableArguments;
}

@Override
public Optional<Integer> getMinCount() {
return Optional.of(staticTypes.size()+minVariableArguments);
}

@Override
public Optional<Integer> getMaxCount() {
return Optional.of(staticTypes.size()+maxVariableArguments);
}
};
}

@Override
public Optional<List<DataType>> inferInputTypes(CallContext callContext,
boolean throwOnFailure) {
int argCount = callContext.getArgumentDataTypes().size();
int varArgs = argCount - staticTypes.size();
if (varArgs < 0 || varArgs < minVariableArguments || varArgs > maxVariableArguments)
return Optional.empty();
ArrayList<DataType> result = new ArrayList<>(argCount);
result.addAll(staticTypes);
for (int i = 0; i < varArgs; i++) {
result.add(variableType);
}
return Optional.of(result);
}

@Override
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
List<Signature.Argument> arguments = new ArrayList<>(staticTypes.size()+1);
staticTypes.stream().map(DataType::toString).map(Signature.Argument::of).forEach(arguments::add);
arguments.add(Signature.Argument.of(variableType.toString() + "..."));
return List.of(Signature.of(arguments));
}
}
}
18 changes: 18 additions & 0 deletions sqrl-lib-common/src/main/java/com/datasqrl/json/FlinkJsonType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.datasqrl.json;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.table.annotation.DataTypeHint;

@DataTypeHint(value = "RAW", bridgedTo = FlinkJsonType.class,
rawSerializer = FlinkJsonTypeSerializer.class)
public class FlinkJsonType {
public JsonNode json;

public FlinkJsonType(JsonNode json) {
this.json = json;
}

public JsonNode getJson() {
return json;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package com.datasqrl.json;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import java.io.IOException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

public class FlinkJsonTypeSerializer extends TypeSerializer<FlinkJsonType> {

ObjectMapper mapper = new ObjectMapper();
@Override
public boolean isImmutableType() {
return true;
}

@Override
public FlinkJsonType createInstance() {
return new FlinkJsonType(null);
}

@Override
public FlinkJsonType copy(FlinkJsonType from) {
return new FlinkJsonType(from.getJson());
}

@Override
public FlinkJsonType copy(FlinkJsonType from, FlinkJsonType reuse) {
return copy(from);
}

@Override
public int getLength() {
return -1; // indicates that this serializer does not have a fixed length
}

@Override
public void serialize(FlinkJsonType record, DataOutputView target) throws IOException {
byte[] jsonData = mapper.writeValueAsBytes(record.getJson());
target.writeInt(jsonData.length);
target.write(jsonData);
}

@Override
public FlinkJsonType deserialize(DataInputView source) throws IOException {
int length = source.readInt();
byte[] jsonData = new byte[length];
source.readFully(jsonData);
return new FlinkJsonType(mapper.readTree(jsonData));
}

@Override
public FlinkJsonType deserialize(FlinkJsonType reuse, DataInputView source) throws IOException {
return deserialize(source);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
int length = source.readInt();
byte[] jsonData = new byte[length];
source.readFully(jsonData);
target.writeInt(length);
target.write(jsonData);
}

@Override
public TypeSerializer<FlinkJsonType> duplicate() {
return this;
}

@Override
public boolean equals(Object obj) {
return obj instanceof FlinkJsonTypeSerializer;
}

@Override
public int hashCode() {
return FlinkJsonTypeSerializer.class.hashCode();
}

@Override
public TypeSerializerSnapshot<FlinkJsonType> snapshotConfiguration() {
return new FlinkJsonTypeSerializerSnapshot();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.datasqrl.json;

import java.io.IOException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

public class FlinkJsonTypeSerializerSnapshot implements TypeSerializerSnapshot<FlinkJsonType> {

private Class<FlinkJsonTypeSerializer> serializerClass;

public FlinkJsonTypeSerializerSnapshot() {
this.serializerClass = FlinkJsonTypeSerializer.class;
}

@Override
public int getCurrentVersion() {
return 1;
}

@Override
public void writeSnapshot(DataOutputView out) throws IOException {
out.writeUTF(FlinkJsonTypeSerializer.class.getName());
}

@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader)
throws IOException {
String className = in.readUTF();
try {
this.serializerClass = (Class<FlinkJsonTypeSerializer>) Class.forName(className, true,
userCodeClassLoader);
} catch (ClassNotFoundException e) {
throw new IOException("Failed to find serializer class: " + className, e);
}
}

@Override
public TypeSerializer restoreSerializer() {
try {
return serializerClass.newInstance();
} catch (InstantiationException | IllegalAccessException e) {
throw new RuntimeException(
"Failed to instantiate serializer class: " + serializerClass.getName(), e);
}
}

@Override
public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(
TypeSerializer newSerializer) {
if (newSerializer.getClass() == this.serializerClass) {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
} else {
return TypeSerializerSchemaCompatibility.incompatible();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.datasqrl.vector;

import org.apache.flink.table.annotation.DataTypeHint;

@DataTypeHint(value = "RAW", bridgedTo = FlinkVectorType.class, rawSerializer = FlinkVectorTypeSerializer.class)
public class FlinkVectorType {
public double[] value;

public FlinkVectorType(double[] value) {
this.value = value;
}

public double[] getValue() {
return value;
}
}
Loading
Loading