Skip to content

Commit eeaa8e8

Browse files
akolesnikovcopybara-github
authored andcommitted
Move fast_pipeline shared object naming into shared functions.
PiperOrigin-RevId: 638748189
1 parent 8cbd49f commit eeaa8e8

8 files changed

+123
-28
lines changed

build_and_test.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ g++ -std=c++14 -shared \
5454
-o deepvariant/examples_from_stream.so \
5555
-fPIC \
5656
-l:libtensorflow_framework.so.2 \
57-
-I/home/koles/deepvariant \
57+
-I. \
5858
${TF_CFLAGS[@]} \
5959
${TF_LFLAGS[@]} \
6060
-D_GLIBCXX_USE_CXX11_ABI=1 \

build_release_binaries.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ g++ -std=c++14 -shared \
114114
-o deepvariant/examples_from_stream.so \
115115
-fPIC \
116116
-l:libtensorflow_framework.so.2 \
117-
-I/home/koles/deepvariant \
117+
-I. \
118118
${TF_CFLAGS[@]} \
119119
${TF_LFLAGS[@]} \
120120
-D_GLIBCXX_USE_CXX11_ABI=1 \

deepvariant/BUILD

+12
Original file line numberDiff line numberDiff line change
@@ -1566,6 +1566,7 @@ cc_library(
15661566
hdrs = ["make_examples_native.h"],
15671567
deps = [
15681568
":alt_aligned_pileup_lib",
1569+
":fast_pipeline_utils",
15691570
":pileup_image_native",
15701571
":stream_examples",
15711572
"//deepvariant/protos:deepvariant_cc_pb2",
@@ -1664,6 +1665,7 @@ cc_library(
16641665
srcs = ["stream_examples.cc"],
16651666
hdrs = ["stream_examples.h"],
16661667
deps = [
1668+
":fast_pipeline_utils",
16671669
":pileup_image_native",
16681670
"//deepvariant/protos:deepvariant_cc_pb2",
16691671
"//third_party/nucleus/protos:variants_cc_pb2",
@@ -1681,9 +1683,19 @@ cc_binary(
16811683
"fast_pipeline.h",
16821684
],
16831685
deps = [
1686+
":fast_pipeline_utils",
16841687
"@com_google_absl//absl/flags:flag",
16851688
"@com_google_absl//absl/flags:parse",
16861689
"@com_google_absl//absl/log",
16871690
"@com_google_absl//absl/strings",
16881691
],
16891692
)
1693+
1694+
cc_library(
1695+
name = "fast_pipeline_utils",
1696+
srcs = ["fast_pipeline_utils.h"],
1697+
hdrs = ["fast_pipeline_utils.h"],
1698+
deps = [
1699+
"@com_google_absl//absl/strings",
1700+
],
1701+
)

deepvariant/fast_pipeline.cc

+22-12
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
#include <string>
4040

4141

42+
#include "deepvariant/fast_pipeline_utils.h"
4243
#include "absl/flags/flag.h"
4344
#include "absl/flags/parse.h"
4445
#include "absl/log/log.h"
@@ -54,9 +55,6 @@
5455
#include "boost/process.hpp" // NOLINT
5556
#include "boost/process/search_path.hpp" // NOLINT
5657

57-
namespace bp = boost::process;
58-
namespace bi = boost::interprocess;
59-
6058
ABSL_FLAG(std::string, make_example_flags, "",
6159
"file containing make_examples flags");
6260
ABSL_FLAG(std::string, call_variants_flags, "",
@@ -66,6 +64,13 @@ ABSL_FLAG(int, num_shards, 0, "number of make_examples shards");
6664
ABSL_FLAG(int, buffer_size, 10485760,
6765
"Shared memory buffer size for each shard, default is 10MB");
6866

67+
namespace learning {
68+
namespace genomics {
69+
namespace deepvariant {
70+
71+
namespace bp = boost::process;
72+
namespace bi = boost::interprocess;
73+
6974
FastPipeline::FastPipeline(int num_shards, int buffer_size,
7075
absl::string_view shm_prefix,
7176
absl::string_view path_to_make_examples_flags,
@@ -106,7 +111,7 @@ FastPipeline::FastPipeline(int num_shards, int buffer_size,
106111
void FastPipeline::SetGlobalObjects() {
107112
for (int shard = 0; shard < num_shards_; ++shard) {
108113
// Create shared memory buffers.
109-
std::string shard_shm_name = absl::StrCat(shm_prefix_, "_shm_", shard);
114+
std::string shard_shm_name = GetShmBufferName(shm_prefix_, shard);
110115
shm_[shard] =
111116
std::make_unique<bi::shared_memory_object>(bi::shared_memory_object(
112117
bi::open_or_create, shard_shm_name.data(), bi::read_write));
@@ -115,17 +120,17 @@ void FastPipeline::SetGlobalObjects() {
115120
LOG(INFO) << "Creating buffer_empty mutex";
116121
buffer_empty_[shard] = std::make_unique<bi::named_mutex>(
117122
bi::open_or_create,
118-
absl::StrCat(shm_prefix_, "_buffer_empty_", shard).data());
123+
GetBufferEmptyMutexName(shm_prefix_, shard).data());
119124
// Create mutex signalling that items are available in the buffer.
120125
LOG(INFO) << "Creating items_available mutex";
121126
items_available_[shard] = std::make_unique<bi::named_mutex>(
122127
bi::open_or_create,
123-
absl::StrCat(shm_prefix_, "_items_available_", shard).data());
128+
GetItemsAvailableMutexName(shm_prefix_, shard).data());
124129
// Create mutex signalling that shard is finished.
125130
LOG(INFO) << "Creating shard_finished mutex";
126131
make_examples_shard_finished_[shard] = std::make_unique<bi::named_mutex>(
127132
bi::open_or_create,
128-
absl::StrCat(shm_prefix_, "_shard_finished_", shard).data());
133+
GetShardFinishedMutexName(shm_prefix_, shard).data());
129134
}
130135
}
131136

@@ -135,13 +140,13 @@ void FastPipeline::ClearGlobalObjects() {
135140
buffer_empty_[shard].release();
136141
items_available_[shard].release();
137142
make_examples_shard_finished_[shard].release();
138-
shm_[shard]->remove(absl::StrCat(shm_prefix_, "_shm_", shard).data());
143+
shm_[shard]->remove(GetShmBufferName(shm_prefix_, shard).data());
139144
buffer_empty_[shard]->remove(
140-
absl::StrCat(shm_prefix_, "_buffer_empty_", shard).data());
145+
GetBufferEmptyMutexName(shm_prefix_, shard).data());
141146
items_available_[shard]->remove(
142-
absl::StrCat(shm_prefix_, "_items_available_", shard).data());
147+
GetItemsAvailableMutexName(shm_prefix_, shard).data());
143148
make_examples_shard_finished_[shard]->remove(
144-
absl::StrCat(shm_prefix_, "_shard_finished_", shard).data());
149+
GetShardFinishedMutexName(shm_prefix_, shard).data());
145150
}
146151
}
147152

@@ -206,6 +211,10 @@ void RunFastPipeline(absl::string_view dv_bin_path) {
206211
fast_pipeline.ClearGlobalObjects();
207212
}
208213

214+
} // namespace deepvariant
215+
} // namespace genomics
216+
} // namespace learning
217+
209218
int main(int argc, char** argv) {
210219

211220
absl::ParseCommandLine(argc, argv);
@@ -224,5 +233,6 @@ int main(int argc, char** argv) {
224233
// 5. call_variants_flags file exists
225234
// 6. No SHM files with the same prefix exist.
226235

227-
RunFastPipeline(dv_bin_path);
236+
learning::genomics::deepvariant::RunFastPipeline(dv_bin_path);
237+
return EXIT_SUCCESS;
228238
}

deepvariant/fast_pipeline.h

+8
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@
4040
#include "boost/interprocess/sync/named_mutex.hpp" // NOLINT
4141
#include "boost/process.hpp" // NOLINT
4242

43+
namespace learning {
44+
namespace genomics {
45+
namespace deepvariant {
46+
4347
class FastPipeline {
4448
public:
4549
FastPipeline(int num_shards, int buffer_size, absl::string_view shm_prefix,
@@ -72,4 +76,8 @@ class FastPipeline {
7276
std::vector<std::string> call_variants_flags_;
7377
};
7478

79+
} // namespace deepvariant
80+
} // namespace genomics
81+
} // namespace learning
82+
7583
#endif // LEARNING_GENOMICS_DEEPVARIANT_FAST_PIPELINE_H_

deepvariant/fast_pipeline_utils.h

+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2024 Google LLC.
3+
*
4+
* Redistribution and use in source and binary forms, with or without
5+
* modification, are permitted provided that the following conditions
6+
* are met:
7+
*
8+
* 1. Redistributions of source code must retain the above copyright notice,
9+
* this list of conditions and the following disclaimer.
10+
*
11+
* 2. Redistributions in binary form must reproduce the above copyright
12+
* notice, this list of conditions and the following disclaimer in the
13+
* documentation and/or other materials provided with the distribution.
14+
*
15+
* 3. Neither the name of the copyright holder nor the names of its
16+
* contributors may be used to endorse or promote products derived from this
17+
* software without specific prior written permission.
18+
*
19+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22+
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
23+
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
24+
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
25+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
26+
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
27+
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
28+
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
29+
* POSSIBILITY OF SUCH DAMAGE.
30+
*/
31+
32+
#ifndef LEARNING_GENOMICS_DEEPVARIANT_FAST_PIPELINE_UTILS_H_
33+
#define LEARNING_GENOMICS_DEEPVARIANT_FAST_PIPELINE_UTILS_H_
34+
35+
#include <string>
36+
37+
#include "absl/strings/str_cat.h"
38+
#include "absl/strings/string_view.h"
39+
40+
namespace learning {
41+
namespace genomics {
42+
namespace deepvariant {
43+
44+
inline std::string GetShmBufferName(absl::string_view prefix, int shard) {
45+
return absl::StrCat(prefix, "_shm_", shard);
46+
}
47+
48+
inline std::string GetBufferEmptyMutexName(absl::string_view prefix,
49+
int shard) {
50+
return absl::StrCat(prefix, "_buffer_empty_", shard);
51+
}
52+
53+
inline std::string GetItemsAvailableMutexName(absl::string_view prefix,
54+
int shard) {
55+
return absl::StrCat(prefix, "_items_available_", shard);
56+
}
57+
58+
inline std::string GetShardFinishedMutexName(absl::string_view prefix,
59+
int shard) {
60+
return absl::StrCat(prefix, "_shard_finished_", shard);
61+
}
62+
63+
} // namespace deepvariant
64+
} // namespace genomics
65+
} // namespace learning
66+
67+
#endif // LEARNING_GENOMICS_DEEPVARIANT_FAST_PIPELINE_UTILS_H_

deepvariant/stream_examples.cc

+5-4
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "absl/strings/str_cat.h"
4242
#include "absl/strings/string_view.h"
4343
#include "third_party/nucleus/protos/variants.pb.h"
44+
#include "deepvariant/fast_pipeline_utils.h"
4445

4546
namespace learning {
4647
namespace genomics {
@@ -61,7 +62,7 @@ StreamExamples::StreamExamples(
6162
}
6263
absl::string_view shm_prefix = options.shm_prefix();
6364
// Name of the shared memory buffer.
64-
shm_name_ = absl::StrCat(shm_prefix, "_shm_", options_.task_id());
65+
shm_name_ = GetShmBufferName(shm_prefix, options_.task_id());
6566
shm_buffer_size_ = options_.shm_buffer_size();
6667

6768
// Open an existing shared memory buffer.
@@ -78,16 +79,16 @@ StreamExamples::StreamExamples(
7879
LOG(INFO) << "Creating buffer_empty mutex";
7980
buffer_empty_ = std::make_unique<boost::interprocess::named_mutex>(
8081
boost::interprocess::open_only,
81-
absl::StrCat(shm_prefix, "_buffer_empty_", options_.task_id()).data());
82+
GetBufferEmptyMutexName(shm_prefix, options_.task_id()).data());
8283
LOG(INFO) << "Creating items_available mutex";
8384
items_available_ = std::make_unique<boost::interprocess::named_mutex>(
8485
boost::interprocess::open_only,
85-
absl::StrCat(shm_prefix, "_items_available_", options_.task_id()).data());
86+
GetItemsAvailableMutexName(shm_prefix, options_.task_id()).data());
8687
items_available_->lock();
8788
LOG(INFO) << "Creating shard_finished mutex";
8889
shard_finished_ = std::make_unique<boost::interprocess::named_mutex>(
8990
boost::interprocess::open_only,
90-
absl::StrCat(shm_prefix, "_shard_finished_", options_.task_id()).data());
91+
GetShardFinishedMutexName(shm_prefix, options_.task_id()).data());
9192
shard_finished_->lock();
9293
}
9394

deepvariant/stream_examples_kernel.cc

+7-10
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,14 @@
6161
#include "tensorflow/core/platform/types.h"
6262
#include "tensorflow/tsl/platform/errors.h"
6363
#include "tensorflow/tsl/platform/thread_annotations.h"
64+
#include "deepvariant/fast_pipeline_utils.h"
6465

6566
namespace learning {
6667
namespace genomics {
6768
namespace deepvariant {
6869

6970
// This class implements a custom TensorFlow op that reads data from shared
70-
// memory files and returns a batch of examples. Batches are variable length.
71+
// memory buffers and returns a batch of examples. Batches are variable length.
7172
// Examples are read from shared memory buffers (one for each shard).
7273
// Examples are written in the following format:
7374
// int: length of alt_indices_encoded
@@ -98,11 +99,10 @@ class StreamExamplesResource : public tensorflow::ResourceBase {
9899
make_examples_shard_finished_.resize(num_shards);
99100
num_shards_ = num_shards;
100101
for (int shard = 0; shard < num_shards; shard++) {
101-
// TODO Move name generation into a shared function.
102-
std::string shm_name = absl::StrCat(shm_prefix, "_shm_", shard);
103102
shm_[shard] = std::make_unique<boost::interprocess::shared_memory_object>(
104103
boost::interprocess::shared_memory_object(
105-
boost::interprocess::open_only, shm_name.data(),
104+
boost::interprocess::open_only,
105+
GetShmBufferName(shm_prefix, shard).data(),
106106
boost::interprocess::read_write));
107107
shm_region_[shard] = std::make_unique<boost::interprocess::mapped_region>(
108108
*shm_[shard], boost::interprocess::read_write);
@@ -112,18 +112,15 @@ class StreamExamplesResource : public tensorflow::ResourceBase {
112112
// Init mutexes
113113
buffer_empty_[shard] = std::make_unique<boost::interprocess::named_mutex>(
114114
boost::interprocess::open_only,
115-
// TODO Move name generation into a shared function.
116-
absl::StrCat(shm_prefix, "_buffer_empty_", shard).data());
115+
GetBufferEmptyMutexName(shm_prefix, shard).data());
117116
items_available_[shard] =
118117
std::make_unique<boost::interprocess::named_mutex>(
119118
boost::interprocess::open_only,
120-
// TODO Move name generation into a shared function.
121-
absl::StrCat(shm_prefix, "_items_available_", shard).data());
119+
GetItemsAvailableMutexName(shm_prefix, shard).data());
122120
make_examples_shard_finished_[shard] =
123121
std::make_unique<boost::interprocess::named_mutex>(
124122
boost::interprocess::open_only,
125-
// TODO Move name generation into a shared function.
126-
absl::StrCat(shm_prefix, "_shard_finished_", shard).data());
123+
GetShardFinishedMutexName(shm_prefix, shard).data());
127124
}
128125

129126
return tensorflow::Status();

0 commit comments

Comments
 (0)