diff --git a/.gitignore b/.gitignore
index 055cec09d..99d8ba8cb 100644
--- a/.gitignore
+++ b/.gitignore
@@ -298,3 +298,4 @@ temp_*
*.trace.db
!oshdb-api/src/test/resources/test-data.mv.db
!oshdb-util/src/test/resources/test-data.mv.db
+!oshdb-osm-source/src/test/resources/sample.pbf
diff --git a/oshdb-osm-source/pom.xml b/oshdb-osm-source/pom.xml
new file mode 100644
index 000000000..dd0128eef
--- /dev/null
+++ b/oshdb-osm-source/pom.xml
@@ -0,0 +1,73 @@
+
+
+ 4.0.0
+
+ oshdb-parent
+ org.heigit.ohsome
+ 1.0.0-SNAPSHOT
+
+ oshdb-osm-source
+
+
+
+
+
+
+
+ io.projectreactor
+ reactor-bom
+ 2020.0.24
+ pom
+ import
+
+
+
+
+
+
+
+ io.projectreactor
+ reactor-core
+
+
+
+ org.heigit.ohsome
+ oshdb-util
+ ${project.version}
+
+
+
+
+ com.google.protobuf
+ protobuf-java
+ 3.21.9
+
+
+
+ org.openstreetmap.pbf
+ osmpbf
+ 1.5.0
+
+
+ com.google.protobuf
+ protobuf-java
+
+
+
+
+
+ io.projectreactor
+ reactor-test
+ test
+
+
+ org.assertj
+ assertj-core
+ 3.23.1
+ test
+
+
+
+
\ No newline at end of file
diff --git a/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/TagTranslator.java b/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/TagTranslator.java
new file mode 100644
index 000000000..4716896d1
--- /dev/null
+++ b/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/TagTranslator.java
@@ -0,0 +1,18 @@
+package org.heigit.ohsome.oshdb;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMRole;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMTag;
+
+public interface TagTranslator {
+
+ Map getOSHDBTagOf(Collection extends OSMTag> tags);
+
+ Map lookupTag(Set extends OSHDBTag> tags);
+
+ Map getOSHDBRoleOf(Collection extends OSMRole> values);
+
+ Map lookupRole(Set extends OSHDBRole> roles);
+}
diff --git a/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/osm/OSMSource.java b/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/osm/OSMSource.java
new file mode 100644
index 000000000..09b112dfc
--- /dev/null
+++ b/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/osm/OSMSource.java
@@ -0,0 +1,9 @@
+package org.heigit.ohsome.oshdb.osm;
+
+import org.heigit.ohsome.oshdb.TagTranslator;
+import reactor.core.publisher.Flux;
+
+public interface OSMSource {
+
+ Flux entities(TagTranslator translator);
+}
diff --git a/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/osm/pbf/Blob.java b/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/osm/pbf/Blob.java
new file mode 100644
index 000000000..9f7991545
--- /dev/null
+++ b/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/osm/pbf/Blob.java
@@ -0,0 +1,125 @@
+package org.heigit.ohsome.oshdb.osm.pbf;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import crosby.binary.Fileformat;
+import crosby.binary.Osmformat;
+import crosby.binary.file.FileFormatException;
+import java.io.DataInputStream;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.NoSuchElementException;
+import java.util.zip.DataFormatException;
+import java.util.zip.Inflater;
+import reactor.core.publisher.Mono;
+
+public class Blob {
+
+ private static final int MAX_HEADER_SIZE = 64 * 1024;
+
+ public static Blob read(InputStream input) throws IOException {
+ DataInputStream dataInput = new DataInputStream(input);
+ var headerSize = dataInput.readInt();
+ if (headerSize > MAX_HEADER_SIZE) {
+ throw new FileFormatException(
+ "Unexpectedly long header " + MAX_HEADER_SIZE + " bytes. Possibly corrupt file.");
+ }
+
+ var buf = new byte[headerSize];
+ dataInput.readFully(buf);
+ var header = Fileformat.BlobHeader.parseFrom(buf);
+
+ var offset = position(input);
+
+ var data = new byte[header.getDatasize()];
+ dataInput.readFully(data);
+
+ return new Blob(header.getType(), offset, data);
+ }
+
+ private static long position(InputStream input) throws IOException {
+ if (input instanceof FileInputStream) {
+ return ((FileInputStream) input).getChannel().position();
+ }
+ return -1;
+ }
+
+ private final String type;
+ private final long offset;
+ private final byte[] data;
+
+ private Blob(String type, long offset, byte[] data) {
+ this.type = type;
+ this.offset = offset;
+ this.data = data;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public byte[] data() {
+ return data;
+ }
+
+ public boolean isHeader() {
+ return "OSMHeader".equals(type);
+ }
+
+ public Osmformat.HeaderBlock header() throws FileFormatException {
+ if (!isHeader()) {
+ throw new NoSuchElementException();
+ }
+
+ try {
+ return Osmformat.HeaderBlock.parseFrom(decompress());
+ } catch (InvalidProtocolBufferException e) {
+ throw new FileFormatException(e);
+ }
+ }
+
+ public boolean isData() {
+ return "OSMData".equals(type);
+ }
+
+ public Mono block() {
+ if (!isData()) {
+ return Mono.error(new NoSuchElementException());
+ }
+ return Mono.fromCallable(() -> Block.parse(this, decompress()));
+ }
+
+ private byte[] decompress() throws FileFormatException {
+ var blob = parseBlob();
+ if (blob.hasRaw()) {
+ return blob.getRaw().toByteArray();
+ }
+ if (blob.hasZlibData()) {
+ return decompress(blob);
+ }
+ throw new UnsupportedOperationException();
+ }
+
+ private static byte[] decompress(Fileformat.Blob blob) throws FileFormatException {
+ var buffer = new byte[blob.getRawSize()];
+ Inflater inflater = new Inflater();
+ try {
+ inflater.setInput(blob.getZlibData().toByteArray());
+ inflater.inflate(buffer);
+ assert (inflater.finished());
+ } catch (DataFormatException e) {
+ throw new FileFormatException(e);
+ } finally {
+ inflater.end();
+ }
+ return buffer;
+ }
+
+ private Fileformat.Blob parseBlob() throws FileFormatException {
+ try {
+ return Fileformat.Blob.parseFrom(data);
+ } catch (InvalidProtocolBufferException e) {
+ throw new FileFormatException(e);
+ }
+ }
+}
diff --git a/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/osm/pbf/Block.java b/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/osm/pbf/Block.java
new file mode 100644
index 000000000..9c16b1199
--- /dev/null
+++ b/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/osm/pbf/Block.java
@@ -0,0 +1,295 @@
+package org.heigit.ohsome.oshdb.osm.pbf;
+
+import static java.lang.Boolean.TRUE;
+import static java.lang.Math.toIntExact;
+import static java.util.Spliterators.spliterator;
+import static java.util.function.Predicate.not;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.StreamSupport.stream;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import crosby.binary.Osmformat;
+import crosby.binary.Osmformat.DenseNodes;
+import crosby.binary.Osmformat.PrimitiveGroup;
+import crosby.binary.file.FileFormatException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Spliterator;
+import java.util.function.IntFunction;
+import java.util.stream.Stream;
+import org.heigit.ohsome.oshdb.OSHDBRole;
+import org.heigit.ohsome.oshdb.OSHDBTag;
+import org.heigit.ohsome.oshdb.osm.OSM;
+import org.heigit.ohsome.oshdb.osm.OSMEntity;
+import org.heigit.ohsome.oshdb.osm.OSMMember;
+import org.heigit.ohsome.oshdb.osm.OSMNode;
+import org.heigit.ohsome.oshdb.osm.OSMRelation;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.osm.OSMWay;
+import org.heigit.ohsome.oshdb.util.exceptions.OSHDBException;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMRole;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMTag;
+
+public class Block {
+
+ public static Block parse(Blob blob, byte[] data) throws FileFormatException {
+ try {
+ var block = Osmformat.PrimitiveBlock.parseFrom(data);
+
+ var granularity = block.getGranularity();
+ var latOffset = block.getLatOffset();
+ var lonOffset = block.getLonOffset();
+ var dateGranularity = block.getDateGranularity();
+
+ if (granularity != 100) {
+ throw new OSHDBException("expected granularity must be 100! But got " + granularity);
+ }
+ if (dateGranularity != 1000) {
+ throw new OSHDBException(
+ "expected date granularity must be 1000! But got " + dateGranularity);
+ }
+ if (lonOffset != 0 || latOffset != 0) {
+ throw new OSHDBException(
+ "expected lon/lat offset must be 0! But got " + lonOffset + "/" + latOffset);
+ }
+
+ var stringTable = block.getStringtable();
+ var strings = new String[stringTable.getSCount()];
+ for (int i = 0; i < strings.length; i++) {
+ strings[i] = stringTable.getS(i).toStringUtf8();
+ }
+ return new Block(blob, block, strings);
+
+ } catch (InvalidProtocolBufferException e) {
+ throw new FileFormatException(e);
+ }
+ }
+
+ private final Blob blob;
+ private final Osmformat.PrimitiveBlock primitiveBlock;
+ private final String[] strings;
+ private final Map blockTags = new HashMap<>();
+ private final Map blockRoles = new HashMap<>();
+
+ private Block(Blob blob, Osmformat.PrimitiveBlock block, String[] strings) {
+ this.blob = blob;
+ this.primitiveBlock = block;
+ this.strings = strings;
+ }
+
+ public Blob getBlob() {
+ return blob;
+ }
+
+ public List> entities() {
+ return primitiveBlock.getPrimitivegroupList().stream()
+ .flatMap(this::groupToEntities)
+ .filter(not(List::isEmpty))
+ .collect(toList());
+ }
+
+ private Stream> groupToEntities(Osmformat.PrimitiveGroup group) {
+ return Stream.of(
+ denseToEntities(group),
+ group.getNodesList().stream().map(this::parse),
+ group.getWaysList().stream().map(this::parse),
+ group.getRelationsList().stream().map(this::parse))
+ .map(stream -> stream.collect(toList()));
+ }
+
+ private Stream denseToEntities(PrimitiveGroup group) {
+ if (!group.hasDense()) {
+ return Stream.empty();
+ }
+ var dense = group.getDense();
+ var itr = new DenseIterator(dense);
+ return stream(spliterator(itr, dense.getIdCount(), Spliterator.ORDERED), false);
+ }
+
+ public Map getBlockTags() {
+ return blockTags;
+ }
+
+ public Map getBlockRoles() {
+ return blockRoles;
+ }
+
+ private class DenseIterator implements Iterator {
+
+ private final Osmformat.DenseNodes dense;
+
+ private final List versions;
+ private final List timestamps;
+ private final List changesets;
+ private final List users;
+ private final IntFunction visibilities;
+ private final IntFunction> keysVals;
+
+ private long id;
+ private long timestamp;
+ private long changeset;
+ private int user;
+ private long lon;
+ private long lat;
+
+ private int next = 0;
+
+ public DenseIterator(Osmformat.DenseNodes dense) {
+ this.dense = dense;
+ if (!dense.hasDenseinfo()) {
+ throw new OSHDBException("entity info is required for oshdb");
+ }
+
+ var info = dense.getDenseinfo();
+ versions = info.getVersionList();
+ timestamps = info.getTimestampList();
+ changesets = info.getChangesetList();
+ users = info.getUidList();
+ if (!info.getVisibleList().isEmpty()) {
+ visibilities = info.getVisibleList()::get;
+ } else {
+ visibilities = x -> true;
+ }
+
+ if (dense.getKeysValsList().isEmpty()) {
+ keysVals = x -> Collections.emptyList();
+ } else {
+ this.keysVals = buildKeyVals(dense)::get;
+ }
+ }
+
+ private List> buildKeyVals(DenseNodes dense) {
+ var list = new ArrayList>(dense.getIdCount());
+ var tags = new ArrayList();
+ for (var i = 0; i < dense.getKeysValsCount(); i++) {
+ var key = dense.getKeysVals(i);
+ if (key == 0) {
+ addToBlockTags(tags);
+ list.add(List.copyOf(tags));
+ tags.clear();
+ } else {
+ var val = dense.getKeysVals(++i);
+ tags.add(new OSHDBTag(key, val));
+ }
+ }
+ return list;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next < dense.getIdCount();
+ }
+
+ @Override
+ public OSMEntity next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ return getNext(next++);
+ }
+
+ private OSMEntity getNext(int index) {
+ id += dense.getId(index);
+ timestamp += timestamps.get(index);
+ changeset += changesets.get(index);
+ user += users.get(index);
+
+ var visible = TRUE.equals(visibilities.apply(index)) ? 1 : -1;
+ var version = versions.get(index) * visible;
+
+ var tags = keysVals.apply(index);
+ lon += dense.getLon(index);
+ lat += dense.getLat(index);
+ return OSM.node(id, version, timestamp, changeset, user, tags, toIntExact(lon),
+ toIntExact(lat));
+ }
+ }
+
+ private OSMNode parse(Osmformat.Node entity) {
+ var id = entity.getId();
+ var lon = entity.getLon();
+ var lat = entity.getLat();
+
+ return withInfo(entity.getKeysList(), entity.getValsList(), entity.getInfo(),
+ (timestamp, changeset, user, version, tags) ->
+ OSM.node(id, version, timestamp, changeset, user, tags, toIntExact(lon),
+ toIntExact(lat)));
+ }
+
+ private OSMWay parse(Osmformat.Way entity) {
+ var id = entity.getId();
+ var members = new OSMMember[entity.getRefsCount()];
+ var memId = 0L;
+ for (var i = 0; i < members.length; i++) {
+ memId += entity.getRefs(i);
+ members[i] = new OSMMember(memId, OSMType.NODE, -1);
+ }
+ return withInfo(entity.getKeysList(), entity.getValsList(), entity.getInfo(),
+ (timestamp, changeset, user, version, tags) ->
+ OSM.way(id, version, timestamp, changeset, user, tags, members));
+ }
+
+ private OSMRelation parse(Osmformat.Relation entity) {
+ var id = entity.getId();
+ var members = new OSMMember[entity.getMemidsCount()];
+ var memId = 0L;
+ var roles = new HashSet();
+ for (var i = 0; i < members.length; i++) {
+ memId += entity.getMemids(i);
+ var type = entity.getTypes(i);
+ var role = entity.getRolesSid(i);
+ var member = new OSMMember(memId, OSMType.fromInt(type.getNumber()), role);
+ roles.add(member.getRole());
+ members[i] = member;
+ }
+ addToBlockRoles(roles);
+ return withInfo(entity.getKeysList(), entity.getValsList(), entity.getInfo(),
+ (timestamp, changeset, user, version, tags) ->
+ OSM.relation(id, version, timestamp, changeset, user, tags, members));
+ }
+
+
+ private T withInfo(List keys, List values, Osmformat.Info info,
+ EntityInfo metadata) {
+ var timestamp = info.getTimestamp();
+ var changeset = info.getChangeset();
+ var user = info.getUid();
+
+ var visible = info.hasVisible() && !info.getVisible() ? -1 : 1;
+ var version = info.getVersion() * visible;
+
+ var tags = new ArrayList(keys.size());
+ for (var i = 0; i < keys.size(); i++) {
+ tags.add(new OSHDBTag(keys.get(i), values.get(i)));
+ }
+ addToBlockTags(tags);
+ return metadata.apply(timestamp, changeset, user, version, tags);
+ }
+
+ private interface EntityInfo {
+ T apply(long timestamp, long changeset, int user, int version, List tags);
+ }
+
+ private void addToBlockTags(List tags) {
+ tags.forEach(tag -> blockTags.computeIfAbsent(tag,this::toOSMTag));
+ }
+
+ private OSMTag toOSMTag(OSHDBTag tag) {
+ return new OSMTag(strings[tag.getKey()], strings[tag.getValue()]);
+ }
+
+ private void addToBlockRoles(Set roles) {
+ roles.forEach(role -> blockRoles.computeIfAbsent(role, this::toOSMRole));
+ }
+
+ private OSMRole toOSMRole(OSHDBRole role) {
+ return new OSMRole(strings[role.getId()]);
+ }
+}
diff --git a/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/osm/pbf/OSMPbfSource.java b/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/osm/pbf/OSMPbfSource.java
new file mode 100644
index 000000000..4feb8741e
--- /dev/null
+++ b/oshdb-osm-source/src/main/java/org/heigit/ohsome/oshdb/osm/pbf/OSMPbfSource.java
@@ -0,0 +1,172 @@
+package org.heigit.ohsome.oshdb.osm.pbf;
+
+import static java.util.stream.Collectors.toList;
+
+import com.google.common.collect.Maps;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.List;
+import java.util.Map;
+import java.util.function.UnaryOperator;
+import java.util.stream.Stream;
+import org.heigit.ohsome.oshdb.OSHDBRole;
+import org.heigit.ohsome.oshdb.OSHDBTag;
+import org.heigit.ohsome.oshdb.TagTranslator;
+import org.heigit.ohsome.oshdb.osm.OSM;
+import org.heigit.ohsome.oshdb.osm.OSMEntity;
+import org.heigit.ohsome.oshdb.osm.OSMMember;
+import org.heigit.ohsome.oshdb.osm.OSMNode;
+import org.heigit.ohsome.oshdb.osm.OSMRelation;
+import org.heigit.ohsome.oshdb.osm.OSMSource;
+import org.heigit.ohsome.oshdb.osm.OSMWay;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.SynchronousSink;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
+public class OSMPbfSource implements OSMSource {
+
+ private final Path path;
+
+ public OSMPbfSource(Path path) {
+ this.path = path;
+ }
+
+ @Override
+ public Flux entities(TagTranslator translator) {
+ return Flux.using(this::openSource,
+ source -> entities(source, translator),
+ this::closeQuietly);
+ }
+
+ private InputStream openSource() throws IOException {
+ return Files.newInputStream(path);
+ }
+
+ private void closeQuietly(InputStream input) {
+ try {
+ input.close();
+ } catch (IOException e) {
+ // ignite ioexception
+ }
+ }
+
+ private Flux entities(InputStream source, TagTranslator tagTranslator) {
+ return Flux.using(() -> Schedulers.newParallel("io"),
+ scheduler -> blobs(source)
+ .filter(Blob::isData)
+ .flatMapSequential(blob -> entities(blob, tagTranslator).subscribeOn(scheduler)),
+ Scheduler::dispose);
+ }
+
+ private Flux blobs(InputStream source) {
+ return Flux.generate(sink -> readBlob(source, sink));
+ }
+
+ private static void readBlob(InputStream source, SynchronousSink sink) {
+ try {
+ sink.next(Blob.read(source));
+ } catch (EOFException e) {
+ sink.complete();
+ } catch (IOException e) {
+ sink.error(e);
+ }
+ }
+
+ private Flux entities(Blob blob, TagTranslator translator) {
+ return blob.block().flatMapMany(block -> entities(block, translator));
+ }
+
+ private Flux entities(Block block, TagTranslator translator) {
+ var entities = block.entities();
+ var tags = mapTags(block, translator);
+ var roles = mapRoles(block, translator);
+ return Flux.fromIterable(entities)
+ .map(list -> rebuild(list, tags, roles))
+ .concatMap(Flux::fromStream);
+ }
+
+ private Map mapTags(Block block, TagTranslator translator) {
+ var tags = block.getBlockTags();
+ var translated = translator.getOSHDBTagOf(tags.values());
+ var mapping = Maps.newHashMapWithExpectedSize(tags.size());
+ tags.forEach((oshdb, osm) -> mapping.put(oshdb, translated.get(osm)));
+ return mapping;
+ }
+
+ private Map mapRoles(Block block, TagTranslator translator) {
+ var roles = block.getBlockRoles();
+ var translated = translator.getOSHDBRoleOf(roles.values());
+ var mapping = Maps.newHashMapWithExpectedSize(roles.size());
+ roles.forEach((oshdb, osm) -> mapping.put(oshdb, translated.get(osm)));
+ return mapping;
+ }
+
+ private Stream rebuild(List list, Map mappingTags,
+ Map mappingRoles) {
+ var type = list.get(0).getType();
+ switch (type) {
+ case NODE:
+ return rebuild(list, OSMNode.class, osm -> rebuild(osm, mappingTags));
+ case WAY:
+ return rebuild(list, OSMWay.class, osm -> rebuild(osm, mappingTags));
+ case RELATION:
+ return rebuild(list, OSMRelation.class, osm -> rebuild(osm, mappingTags, mappingRoles));
+ default:
+ throw new IllegalStateException();
+ }
+ }
+
+ private Stream rebuild(List list, Class type,
+ UnaryOperator rebuild) {
+ return list.stream().map(type::cast).map(rebuild);
+ }
+
+ private OSMNode rebuild(OSMNode osm, Map mappingTags) {
+ return rebuild(osm, mappingTags, (id, version, timestamp, changeset, user, tags) ->
+ OSM.node(id, version, timestamp, changeset, user, tags, osm.getLon(), osm.getLat()));
+ }
+
+ private OSMWay rebuild(OSMWay osm, Map mappingTags) {
+ return rebuild(osm, mappingTags, (id, version, timestamp, changeset, user, tags) ->
+ OSM.way(id, version, timestamp, changeset, user, tags, osm.getMembers()));
+ }
+
+ private OSMRelation rebuild(OSMRelation osm, Map mappingTags,
+ Map mappingRoles) {
+ return rebuild(osm, mappingTags, (id, version, timestamp, changeset, user, tags) ->
+ OSM.relation(id, version, timestamp, changeset, user, tags,
+ rebuildMembers(osm, mappingRoles)));
+ }
+
+ private OSMMember[] rebuildMembers(OSMRelation osm, Map mappingRoles) {
+ var members = osm.getMembers();
+ for (var i = 0; i < members.length; i++) {
+ var member = members[i];
+ var id = member.getId();
+ var type = member.getType();
+ var role = mappingRoles.get(member.getRole());
+ members[i] = new OSMMember(id, type, role.getId());
+ }
+ return members;
+ }
+
+ private T rebuild(T entity, Map mappingTags,
+ EntityCommon entityCommon) {
+ var id = entity.getId();
+ var visible = entity.isVisible() ? 1 : -1;
+ var version = entity.getVersion() * visible;
+ var timestamp = entity.getEpochSecond();
+ var changeset = entity.getChangesetId();
+ var user = entity.getUserId();
+ var tags = entity.getTags().stream().map(mappingTags::get).sorted().collect(toList());
+ return entityCommon.apply(id, version, timestamp, changeset, user, tags);
+ }
+
+ private interface EntityCommon {
+ T apply(long id, int version, long timestamp, long changeset, int user, List tags);
+ }
+}
diff --git a/oshdb-osm-source/src/test/java/org/heigit/ohsome/oshdb/mock/MockTranslator.java b/oshdb-osm-source/src/test/java/org/heigit/ohsome/oshdb/mock/MockTranslator.java
new file mode 100644
index 000000000..4a302c762
--- /dev/null
+++ b/oshdb-osm-source/src/test/java/org/heigit/ohsome/oshdb/mock/MockTranslator.java
@@ -0,0 +1,77 @@
+package org.heigit.ohsome.oshdb.mock;
+
+import com.google.common.collect.Maps;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.heigit.ohsome.oshdb.OSHDBRole;
+import org.heigit.ohsome.oshdb.OSHDBTag;
+import org.heigit.ohsome.oshdb.TagTranslator;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMRole;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMTag;
+
+public class MockTranslator implements TagTranslator {
+ private final Map translate = new ConcurrentHashMap<>();
+ private final Map lookup = new ConcurrentHashMap<>();
+ private final Map translateRole = new ConcurrentHashMap<>();
+ private final Map lookupRole = new ConcurrentHashMap<>();
+ private final Map stringIds = new ConcurrentHashMap<>();
+ private final AtomicInteger nextStringId = new AtomicInteger();
+
+ @Override
+ public Map getOSHDBTagOf(Collection extends OSMTag> tags) {
+ var map = Maps.newHashMapWithExpectedSize(tags.size());
+ tags.forEach(tag -> map.put(tag, translate(tag)));
+ return map;
+ }
+
+ @Override
+ public Map getOSHDBRoleOf(Collection extends OSMRole> roles) {
+ var map = Maps.newHashMapWithExpectedSize(roles.size());
+ roles.forEach(role -> map.put(role, translate(role)));
+ return map;
+ }
+
+ @Override
+ public Map lookupTag(Set extends OSHDBTag> tags) {
+ var map = Maps.newHashMapWithExpectedSize(tags.size());
+ tags.forEach(tag -> map.put(tag, lookup.get(tag)));
+ return map;
+ }
+
+ @Override
+ public Map lookupRole(Set extends OSHDBRole> roles) {
+ var map = Maps.newHashMapWithExpectedSize(roles.size());
+ roles.forEach(role -> map.put(role, lookupRole.get(role)));
+ return map;
+ }
+
+ private OSHDBTag translate(OSMTag tag){
+ return translate.computeIfAbsent(tag, this::tag);
+ }
+
+ private OSHDBTag tag(OSMTag tag) {
+ var key = string(tag.getKey());
+ var val = string(tag.getValue());
+ var oshdb = new OSHDBTag(key,val);
+ lookup.put(oshdb, tag);
+ return oshdb;
+ }
+
+ private OSHDBRole translate(OSMRole role) {
+ return translateRole.computeIfAbsent(role, this::role);
+ }
+
+ private OSHDBRole role(OSMRole role) {
+ var oshdb = OSHDBRole.of(string(role.toString()));
+ lookupRole.put(oshdb, role);
+ return oshdb;
+ }
+
+ private int string(String string) {
+ return stringIds.computeIfAbsent(string, x -> nextStringId.getAndIncrement());
+ }
+
+}
diff --git a/oshdb-osm-source/src/test/java/org/heigit/ohsome/oshdb/osm/pbf/OSMPbfSourceTest.java b/oshdb-osm-source/src/test/java/org/heigit/ohsome/oshdb/osm/pbf/OSMPbfSourceTest.java
new file mode 100644
index 000000000..f815fad31
--- /dev/null
+++ b/oshdb-osm-source/src/test/java/org/heigit/ohsome/oshdb/osm/pbf/OSMPbfSourceTest.java
@@ -0,0 +1,120 @@
+package org.heigit.ohsome.oshdb.osm.pbf;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.heigit.ohsome.oshdb.TagTranslator;
+import org.heigit.ohsome.oshdb.mock.MockTranslator;
+import org.heigit.ohsome.oshdb.osm.OSMEntity;
+import org.heigit.ohsome.oshdb.osm.OSMMember;
+import org.heigit.ohsome.oshdb.osm.OSMNode;
+import org.heigit.ohsome.oshdb.osm.OSMRelation;
+import org.heigit.ohsome.oshdb.osm.OSMType;
+import org.heigit.ohsome.oshdb.osm.OSMWay;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMRole;
+import org.heigit.ohsome.oshdb.util.tagtranslator.OSMTag;
+import org.junit.jupiter.api.Test;
+
+class OSMPbfSourceTest {
+
+ private static final Path testResources = Paths.get("src", "test", "resources");
+ private static final Path SAMPLE_PBF = testResources.resolve("sample.pbf");
+
+ @Test
+ void openSource() {
+ var pbf = testResources.resolve("sample.pbf");
+ assertTrue(Files.exists(pbf));
+ }
+
+ @Test
+ void entities() {
+ var source = new OSMPbfSource(SAMPLE_PBF);
+ var tagTranslator = new MockTranslator();
+ var count = source.entities(tagTranslator).limitRate(10).count().block();
+ assertEquals(339, count);
+
+ var entities = new EnumMap>(OSMType.class);
+
+ source.entities(tagTranslator)
+ .bufferUntilChanged(OSMEntity::getType)
+ .doOnNext(list -> {
+ var type = list.get(0).getType();
+ var map = entities.computeIfAbsent(type, x -> new HashMap<>());
+ list.forEach(entity -> map.put(entity.getId(), entity));
+ })
+ .then().block();
+ assertEquals(3, entities.size());
+
+ OSMEntity sample;
+
+ sample = entities.get(OSMType.NODE).get(647105170L);
+ assertNotNull(sample);
+ assertEquals(2, sample.getVersion());
+ assertEquals(4210769, sample.getChangesetId());
+ assertEquals(1269340001, sample.getEpochSecond());
+ assertEquals(234999, sample.getUserId());
+ assertEquals(-2344645, ((OSMNode) sample).getLon());
+ assertEquals(517635905, ((OSMNode) sample).getLat());
+ assertEquals(0, sample.getTags().size());
+
+ sample = entities.get(OSMType.WAY).get(49161822L);
+ assertNotNull(sample);
+ assertEquals(1, sample.getVersion());
+ assertEquals(3754726, sample.getChangesetId());
+ assertEquals(1264885069, sample.getEpochSecond());
+ assertEquals(508, sample.getUserId());
+ assertThat(Set.copyOf(tagTranslator.lookupTag(sample.getTags()).values()))
+ .hasSize(2)
+ .containsAll(List.of(
+ new OSMTag("highway", "residential"),
+ new OSMTag("name", "Worcester Road")
+ ));
+ assertThat(((OSMWay) sample).getMembers())
+ .hasSize(5)
+ .containsSequence(
+ new OSMMember(30983851, OSMType.NODE, -1),
+ new OSMMember(623624257, OSMType.NODE, -1),
+ new OSMMember(623624154, OSMType.NODE, -1),
+ new OSMMember(623624259, OSMType.NODE, -1),
+ new OSMMember(623624261, OSMType.NODE, -1));
+
+ sample = entities.get(OSMType.RELATION).get(31640L);
+
+ assertNotNull(sample);
+ assertEquals(81, sample.getVersion());
+ assertEquals(11640673, sample.getChangesetId());
+ assertEquals(1337419064, sample.getEpochSecond());
+ assertEquals(24119, sample.getUserId());
+ assertThat(Set.copyOf(tagTranslator.lookupTag(sample.getTags()).values()))
+ .hasSize(5)
+ .containsAll(List.of(
+ new OSMTag("type", "route"),
+ new OSMTag("ref", "61"),
+ new OSMTag("route", "bicycle"),
+ new OSMTag("network", "ncn"),
+ new OSMTag("name", "NCN National Route 61")
+ ));
+
+ assertThat(((OSMRelation) sample).getMembers())
+ .hasSize(234)
+ .contains(
+ new OSMMember(25896435, OSMType.WAY, roleIdOf("forward", tagTranslator)),
+ new OSMMember(121267847, OSMType.WAY, roleIdOf("", tagTranslator)));
+ }
+
+ private int roleIdOf(String role, TagTranslator translator) {
+ var osm = new OSMRole(role);
+ return translator.getOSHDBRoleOf(Set.of(osm)).get(osm).getId();
+ }
+
+}
\ No newline at end of file
diff --git a/oshdb-osm-source/src/test/resources/sample.pbf b/oshdb-osm-source/src/test/resources/sample.pbf
new file mode 100644
index 000000000..8a22edfee
Binary files /dev/null and b/oshdb-osm-source/src/test/resources/sample.pbf differ
diff --git a/pom.xml b/pom.xml
index 6856123b1..34f034859 100644
--- a/pom.xml
+++ b/pom.xml
@@ -28,6 +28,7 @@
oshdb-api-ignite
oshdb-tool
oshdb-helpers
+ oshdb-osm-source