Skip to content

Commit

Permalink
feat(assertions): Adding Assertions Entity & Great Expectations BETA (d…
Browse files Browse the repository at this point in the history
  • Loading branch information
jjoyce0510 authored Mar 4, 2022
1 parent 9a9a5c3 commit 9f1c5a8
Show file tree
Hide file tree
Showing 75 changed files with 6,243 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import com.linkedin.datahub.graphql.generated.ActorFilter;
import com.linkedin.datahub.graphql.generated.AggregationMetadata;
import com.linkedin.datahub.graphql.generated.Aspect;
import com.linkedin.datahub.graphql.generated.Assertion;
import com.linkedin.datahub.graphql.generated.BrowseResults;
import com.linkedin.datahub.graphql.generated.Chart;
import com.linkedin.datahub.graphql.generated.ChartInfo;
Expand Down Expand Up @@ -46,11 +47,15 @@
import com.linkedin.datahub.graphql.generated.UserUsageCounts;
import com.linkedin.datahub.graphql.resolvers.AuthenticatedResolver;
import com.linkedin.datahub.graphql.resolvers.MeResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.AssertionRunEventResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.DeleteAssertionResolver;
import com.linkedin.datahub.graphql.resolvers.assertion.EntityAssertionsResolver;
import com.linkedin.datahub.graphql.resolvers.auth.GetAccessTokenResolver;
import com.linkedin.datahub.graphql.resolvers.browse.BrowsePathsResolver;
import com.linkedin.datahub.graphql.resolvers.browse.BrowseResolver;
import com.linkedin.datahub.graphql.resolvers.config.AppConfigResolver;
import com.linkedin.datahub.graphql.resolvers.container.ContainerEntitiesResolver;
import com.linkedin.datahub.graphql.resolvers.dataset.DatasetHealthResolver;
import com.linkedin.datahub.graphql.resolvers.deprecation.UpdateDeprecationResolver;
import com.linkedin.datahub.graphql.resolvers.domain.CreateDomainResolver;
import com.linkedin.datahub.graphql.resolvers.domain.DomainEntitiesResolver;
Expand Down Expand Up @@ -117,6 +122,7 @@
import com.linkedin.datahub.graphql.types.LoadableType;
import com.linkedin.datahub.graphql.types.SearchableEntityType;
import com.linkedin.datahub.graphql.types.aspect.AspectType;
import com.linkedin.datahub.graphql.types.assertion.AssertionType;
import com.linkedin.datahub.graphql.types.chart.ChartType;
import com.linkedin.datahub.graphql.types.common.mappers.OperationMapper;
import com.linkedin.datahub.graphql.types.container.ContainerType;
Expand Down Expand Up @@ -144,6 +150,7 @@
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.recommendation.RecommendationsService;
import com.linkedin.metadata.secret.SecretService;
import com.linkedin.metadata.timeseries.TimeseriesAspectService;
import com.linkedin.metadata.version.GitVersion;
import com.linkedin.usage.UsageClient;
import graphql.execution.DataFetcherResult;
Expand Down Expand Up @@ -193,6 +200,7 @@ public class GmsGraphQLEngine {
private final TokenService tokenService;
private final SecretService secretService;
private final GitVersion gitVersion;
private final TimeseriesAspectService timeseriesAspectService;

private final IngestionConfiguration ingestionConfiguration;

Expand All @@ -215,6 +223,7 @@ public class GmsGraphQLEngine {
private final UsageType usageType;
private final ContainerType containerType;
private final DomainType domainType;
private final AssertionType assertionType;


/**
Expand Down Expand Up @@ -255,6 +264,7 @@ public GmsGraphQLEngine() {
null,
null,
null,
null,
null);
}

Expand All @@ -266,6 +276,7 @@ public GmsGraphQLEngine(
final EntityService entityService,
final RecommendationsService recommendationsService,
final TokenService tokenService,
final TimeseriesAspectService timeseriesAspectService,
final EntityRegistry entityRegistry,
final SecretService secretService,
final IngestionConfiguration ingestionConfiguration,
Expand All @@ -283,6 +294,7 @@ public GmsGraphQLEngine(
this.secretService = secretService;
this.entityRegistry = entityRegistry;
this.gitVersion = gitVersion;
this.timeseriesAspectService = timeseriesAspectService;

this.ingestionConfiguration = Objects.requireNonNull(ingestionConfiguration);

Expand All @@ -305,6 +317,7 @@ public GmsGraphQLEngine(
this.usageType = new UsageType(this.usageClient);
this.containerType = new ContainerType(entityClient);
this.domainType = new DomainType(entityClient);
this.assertionType = new AssertionType(entityClient);

// Init Lists
this.entityTypes = ImmutableList.of(
Expand All @@ -324,7 +337,8 @@ public GmsGraphQLEngine(
dataJobType,
glossaryTermType,
containerType,
domainType
domainType,
assertionType
);
this.loadableTypes = new ArrayList<>(entityTypes);
this.ownerTypes = ImmutableList.of(corpUserType, corpGroupType);
Expand Down Expand Up @@ -456,6 +470,7 @@ public void configureRuntimeWiring(final RuntimeWiring.Builder builder) {
configureContainerResolvers(builder);
configureGlossaryTermResolvers(builder);
configureDomainResolvers(builder);
configureAssertionResolvers(builder);
configurePolicyResolvers(builder);
}

Expand Down Expand Up @@ -567,6 +582,9 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("mlModelGroup", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(mlModelGroupType,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("assertion", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(assertionType,
(env) -> env.getArgument(URN_FIELD_NAME))))
.dataFetcher("listPolicies",
new ListPoliciesResolver(this.entityClient))
.dataFetcher("listUsers",
Expand Down Expand Up @@ -637,6 +655,7 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("deleteIngestionSource", new DeleteIngestionSourceResolver(this.entityClient))
.dataFetcher("createIngestionExecutionRequest", new CreateIngestionExecutionRequestResolver(this.entityClient, this.ingestionConfiguration))
.dataFetcher("cancelIngestionExecutionRequest", new CancelIngestionExecutionRequestResolver(this.entityClient))
.dataFetcher("deleteAssertion", new DeleteAssertionResolver(this.entityClient, this.entityService))
);
}

Expand Down Expand Up @@ -733,9 +752,11 @@ private void configureDatasetResolvers(final RuntimeWiring.Builder builder) {
)
))
.dataFetcher("usageStats", new AuthenticatedResolver<>(new UsageTypeResolver()))
.dataFetcher("health", new DatasetHealthResolver(graphClient, timeseriesAspectService))
.dataFetcher("schemaMetadata", new AuthenticatedResolver<>(
new AspectResolver())
)
.dataFetcher("assertions", new EntityAssertionsResolver(entityClient, graphClient))
.dataFetcher("aspects", new AuthenticatedResolver<>(
new WeaklyTypedAspectsResolver(entityClient, entityRegistry))
)
Expand Down Expand Up @@ -768,6 +789,7 @@ private void configureDatasetResolvers(final RuntimeWiring.Builder builder) {
(env) -> ((InstitutionalMemoryMetadata) env.getSource()).getAuthor().getUrn()))
)
);

}

private void configureGlossaryTermResolvers(final RuntimeWiring.Builder builder) {
Expand Down Expand Up @@ -1114,6 +1136,14 @@ private void configureDomainResolvers(final RuntimeWiring.Builder builder) {
);
}

private void configureAssertionResolvers(final RuntimeWiring.Builder builder) {
builder.type("Assertion", typeWiring -> typeWiring.dataFetcher("relationships",
new AuthenticatedResolver<>(new EntityRelationshipsResultResolver(graphClient)))
.dataFetcher("platform", new AuthenticatedResolver<>(
new LoadableTypeResolver<>(dataPlatformType, (env) -> ((Assertion) env.getSource()).getPlatform().getUrn())))
.dataFetcher("runEvents", new AssertionRunEventResolver(entityClient)));
}

private void configurePolicyResolvers(final RuntimeWiring.Builder builder) {
// Register resolvers for "resolvedUsers" and "resolvedGroups" field of the Policy type.
builder.type("ActorFilter", typeWiring -> typeWiring
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package com.linkedin.datahub.graphql.resolvers.assertion;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.Assertion;
import com.linkedin.datahub.graphql.generated.AssertionResultType;
import com.linkedin.datahub.graphql.generated.AssertionRunEvent;
import com.linkedin.datahub.graphql.generated.AssertionRunEventsResult;
import com.linkedin.datahub.graphql.generated.AssertionRunStatus;
import com.linkedin.datahub.graphql.types.dataset.mappers.AssertionRunEventMapper;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.EnvelopedAspect;
import com.linkedin.metadata.query.filter.ConjunctiveCriterion;
import com.linkedin.metadata.query.filter.ConjunctiveCriterionArray;
import com.linkedin.metadata.query.filter.Criterion;
import com.linkedin.metadata.query.filter.CriterionArray;
import com.linkedin.metadata.query.filter.Filter;
import com.linkedin.r2.RemoteInvocationException;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;


/**
* GraphQL Resolver used for fetching AssertionRunEvents.
*/
public class AssertionRunEventResolver implements DataFetcher<CompletableFuture<AssertionRunEventsResult>> {

private final EntityClient _client;

public AssertionRunEventResolver(final EntityClient client) {
_client = client;
}

@Override
public CompletableFuture<AssertionRunEventsResult> get(DataFetchingEnvironment environment) {
return CompletableFuture.supplyAsync(() -> {

final QueryContext context = environment.getContext();

final String urn = ((Assertion) environment.getSource()).getUrn();
final String maybeStatus = environment.getArgumentOrDefault("status", null);
final Long maybeStartTimeMillis = environment.getArgumentOrDefault("startTimeMillis", null);
final Long maybeEndTimeMillis = environment.getArgumentOrDefault("endTimeMillis", null);
final Integer maybeLimit = environment.getArgumentOrDefault("limit", null);

try {
// Step 1: Fetch aspects from GMS
List<EnvelopedAspect> aspects = _client.getTimeseriesAspectValues(
urn,
Constants.ASSERTION_ENTITY_NAME,
Constants.ASSERTION_RUN_EVENT_ASPECT_NAME,
maybeStartTimeMillis,
maybeEndTimeMillis,
maybeLimit,
false,
buildStatusFilter(maybeStatus),
context.getAuthentication());

// Step 2: Bind profiles into GraphQL strong types.
List<AssertionRunEvent> runEvents = aspects.stream().map(AssertionRunEventMapper::map).collect(Collectors.toList());

// Step 3: Package and return response.
final AssertionRunEventsResult result = new AssertionRunEventsResult();
result.setTotal(runEvents.size());
result.setFailed(Math.toIntExact(runEvents.stream().filter(runEvent ->
AssertionRunStatus.COMPLETE.equals(runEvent.getStatus())
&& runEvent.getResult() != null
&& AssertionResultType.FAILURE.equals(
runEvent.getResult().getType()
)).count()));
result.setSucceeded(Math.toIntExact(runEvents.stream().filter(runEvent ->
AssertionRunStatus.COMPLETE.equals(runEvent.getStatus())
&& runEvent.getResult() != null
&& AssertionResultType.SUCCESS.equals(runEvent.getResult().getType()
)).count()));
result.setRunEvents(runEvents);
return result;
} catch (RemoteInvocationException e) {
throw new RuntimeException("Failed to retrieve Assertion Run Events from GMS", e);
}
});
}

@Nullable
private Filter buildStatusFilter(@Nullable final String status) {
if (status == null) {
return null;
}
return new Filter().setOr(new ConjunctiveCriterionArray(ImmutableList.of(
new ConjunctiveCriterion().setAnd(new CriterionArray(ImmutableList.of(
new Criterion()
.setField("status")
.setValue(status)
)))
)));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.linkedin.datahub.graphql.resolvers.assertion;

import com.google.common.collect.ImmutableList;
import com.linkedin.assertion.AssertionInfo;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.authorization.ConjunctivePrivilegeGroup;
import com.linkedin.datahub.graphql.authorization.DisjunctivePrivilegeGroup;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.resolvers.AuthUtils;
import com.linkedin.datahub.graphql.resolvers.mutate.MutationUtils;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.entity.EntityService;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.concurrent.CompletableFuture;


/**
* GraphQL Resolver that deletes an Assertion.
*/
public class DeleteAssertionResolver implements DataFetcher<CompletableFuture<Boolean>> {

private final EntityClient _entityClient;
private final EntityService _entityService;

public DeleteAssertionResolver(final EntityClient entityClient, final EntityService entityService) {
_entityClient = entityClient;
_entityService = entityService;
}

@Override
public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();
final Urn assertionUrn = Urn.createFromString(environment.getArgument("urn"));
return CompletableFuture.supplyAsync(() -> {

// 1. check the entity exists. If not, return false.
if (!_entityService.exists(assertionUrn)) {
return true;
}

if (isAuthorizedToDeleteAssertion(context, assertionUrn)) {
try {
_entityClient.deleteEntity(assertionUrn, context.getAuthentication());
return true;
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to perform delete against assertion with urn %s", assertionUrn), e);
}
}
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
});
}

/**
* Determine whether the current user is allowed to remove an assertion.
*/
private boolean isAuthorizedToDeleteAssertion(final QueryContext context, final Urn assertionUrn) {

// 2. fetch the assertion info
AssertionInfo info =
(AssertionInfo) MutationUtils.getAspectFromEntity(
assertionUrn.toString(), Constants.ASSERTION_INFO_ASPECT_NAME, _entityService, null);

if (info != null) {
// 3. check whether the actor has permission to edit the assertions on the assertee
final Urn asserteeUrn = getAsserteeUrnFromInfo(info);
return isAuthorizedToDeleteAssertionFromAssertee(context, asserteeUrn);
}

return true;
}

private boolean isAuthorizedToDeleteAssertionFromAssertee(final QueryContext context, final Urn asserteeUrn) {
final DisjunctivePrivilegeGroup orPrivilegeGroups = new DisjunctivePrivilegeGroup(ImmutableList.of(
AuthUtils.ALL_PRIVILEGES_GROUP,
new ConjunctivePrivilegeGroup(ImmutableList.of(PoliciesConfig.EDIT_ENTITY_ASSERTIONS_PRIVILEGE.getType()))
));
return AuthorizationUtils.isAuthorized(
context.getAuthorizer(),
context.getActorUrn(),
asserteeUrn.getEntityType(),
asserteeUrn.toString(),
orPrivilegeGroups);
}

private Urn getAsserteeUrnFromInfo(final AssertionInfo info) {
switch (info.getType()) {
case DATASET:
return info.getDatasetAssertion().getDataset();
default:
throw new RuntimeException(String.format("Unsupported Assertion Type %s provided", info.getType()));
}
}
}
Loading

0 comments on commit 9f1c5a8

Please sign in to comment.