Skip to content

Commit 11950f6

Browse files
Implement Besu PrivateLog flowable subscription (LFDT-web3j#1198)
* Change visibility of some fields/methods in Filter class (default -> protected) * Added Besu PrivateLogFilter * Added Besu PrivateLogFilter (2) * Spotless
1 parent 84bb820 commit 11950f6

File tree

13 files changed

+223
-22
lines changed

13 files changed

+223
-22
lines changed

besu/src/main/java/org/web3j/protocol/besu/Besu.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.io.IOException;
1616
import java.util.List;
1717
import java.util.Map;
18+
import java.util.concurrent.ScheduledExecutorService;
1819

1920
import org.web3j.crypto.Credentials;
2021
import org.web3j.protocol.Web3jService;
@@ -40,11 +41,34 @@
4041
import org.web3j.protocol.exceptions.TransactionException;
4142
import org.web3j.utils.Base64String;
4243

43-
public interface Besu extends Eea {
44+
public interface Besu extends Eea, BesuRx {
45+
46+
/**
47+
* Construct a new Besu instance.
48+
*
49+
* @param web3jService web3j service instance - i.e. HTTP
50+
* @return new Besu instance
51+
*/
4452
static Besu build(Web3jService web3jService) {
4553
return new JsonRpc2_0Besu(web3jService);
4654
}
4755

56+
/**
57+
* Construct a new Besu instance.
58+
*
59+
* @param web3jService web3j service instance - i.e. HTTP
60+
* @param pollingInterval polling interval for responses from network nodes
61+
* @param scheduledExecutorService executor service to use for scheduled tasks. <strong>You are
62+
* responsible for terminating this thread pool</strong>
63+
* @return new Besu instance
64+
*/
65+
static Besu build(
66+
Web3jService web3jService,
67+
long pollingInterval,
68+
ScheduledExecutorService scheduledExecutorService) {
69+
return new JsonRpc2_0Besu(web3jService, pollingInterval, scheduledExecutorService);
70+
}
71+
4872
Request<?, MinerStartResponse> minerStart();
4973

5074
Request<?, BooleanResponse> minerStop();
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/*
2+
* Copyright 2019 Web3 Labs Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package org.web3j.protocol.besu;
14+
15+
import io.reactivex.Flowable;
16+
17+
import org.web3j.protocol.core.methods.request.EthFilter;
18+
import org.web3j.protocol.core.methods.response.Log;
19+
20+
public interface BesuRx {
21+
22+
Flowable<Log> privLogFlowable(String privacyGroupId, EthFilter ethFilter);
23+
}

besu/src/main/java/org/web3j/protocol/besu/JsonRpc2_0Besu.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,11 @@
1919
import java.util.Collections;
2020
import java.util.List;
2121
import java.util.Map;
22+
import java.util.concurrent.ScheduledExecutorService;
2223
import java.util.stream.Collectors;
2324

25+
import io.reactivex.Flowable;
26+
2427
import org.web3j.crypto.Credentials;
2528
import org.web3j.protocol.Web3jService;
2629
import org.web3j.protocol.admin.methods.response.BooleanResponse;
@@ -36,6 +39,7 @@
3639
import org.web3j.protocol.besu.response.privacy.PrivateTransactionReceipt;
3740
import org.web3j.protocol.core.DefaultBlockParameter;
3841
import org.web3j.protocol.core.Request;
42+
import org.web3j.protocol.core.methods.request.EthFilter;
3943
import org.web3j.protocol.core.methods.request.Transaction;
4044
import org.web3j.protocol.core.methods.response.EthAccounts;
4145
import org.web3j.protocol.core.methods.response.EthCall;
@@ -44,17 +48,32 @@
4448
import org.web3j.protocol.core.methods.response.EthLog;
4549
import org.web3j.protocol.core.methods.response.EthSendTransaction;
4650
import org.web3j.protocol.core.methods.response.EthUninstallFilter;
51+
import org.web3j.protocol.core.methods.response.Log;
4752
import org.web3j.protocol.core.methods.response.MinerStartResponse;
4853
import org.web3j.protocol.eea.JsonRpc2_0Eea;
4954
import org.web3j.protocol.exceptions.TransactionException;
5055
import org.web3j.tx.response.PollingPrivateTransactionReceiptProcessor;
56+
import org.web3j.utils.Async;
5157
import org.web3j.utils.Base64String;
5258

5359
import static java.util.Objects.requireNonNull;
5460

5561
public class JsonRpc2_0Besu extends JsonRpc2_0Eea implements Besu {
56-
public JsonRpc2_0Besu(Web3jService web3jService) {
57-
super(web3jService);
62+
63+
private final JsonRpc2_0BesuRx besuRx;
64+
private final long blockTime;
65+
66+
public JsonRpc2_0Besu(final Web3jService web3jService) {
67+
this(web3jService, DEFAULT_BLOCK_TIME, Async.defaultExecutorService());
68+
}
69+
70+
public JsonRpc2_0Besu(
71+
Web3jService web3jService,
72+
long pollingInterval,
73+
ScheduledExecutorService scheduledExecutorService) {
74+
super(web3jService, pollingInterval, scheduledExecutorService);
75+
this.besuRx = new JsonRpc2_0BesuRx(this, scheduledExecutorService);
76+
this.blockTime = pollingInterval;
5877
}
5978

6079
@Override
@@ -396,4 +415,9 @@ public Request<?, EthLog> privGetFilterLogs(String privacyGroupId, String filter
396415
web3jService,
397416
EthLog.class);
398417
}
418+
419+
@Override
420+
public Flowable<Log> privLogFlowable(final String privacyGroupId, final EthFilter ethFilter) {
421+
return besuRx.privLogFlowable(privacyGroupId, ethFilter, blockTime);
422+
}
399423
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
* Copyright 2019 Web3 Labs Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package org.web3j.protocol.besu;
14+
15+
import java.util.concurrent.ScheduledExecutorService;
16+
17+
import io.reactivex.BackpressureStrategy;
18+
import io.reactivex.Flowable;
19+
import io.reactivex.FlowableEmitter;
20+
21+
import org.web3j.protocol.besu.filters.PrivateLogFilter;
22+
import org.web3j.protocol.core.methods.response.Log;
23+
24+
public class JsonRpc2_0BesuRx {
25+
26+
private final Besu besu;
27+
private final ScheduledExecutorService scheduledExecutorService;
28+
29+
public JsonRpc2_0BesuRx(Besu besu, ScheduledExecutorService scheduledExecutorService) {
30+
this.besu = besu;
31+
this.scheduledExecutorService = scheduledExecutorService;
32+
}
33+
34+
public Flowable<Log> privLogFlowable(
35+
String privacyGroupId,
36+
org.web3j.protocol.core.methods.request.EthFilter ethFilter,
37+
long pollingInterval) {
38+
return Flowable.create(
39+
subscriber -> {
40+
PrivateLogFilter logFilter =
41+
new PrivateLogFilter(
42+
besu, subscriber::onNext, privacyGroupId, ethFilter);
43+
44+
run(logFilter, subscriber, pollingInterval);
45+
},
46+
BackpressureStrategy.BUFFER);
47+
}
48+
49+
private <T> void run(
50+
org.web3j.protocol.core.filters.Filter<T> filter,
51+
FlowableEmitter<? super T> emitter,
52+
long pollingInterval) {
53+
54+
filter.run(scheduledExecutorService, pollingInterval);
55+
emitter.setCancellable(filter::cancel);
56+
}
57+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Copyright 2020 Web3 Labs Ltd.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
5+
* the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
10+
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
11+
* specific language governing permissions and limitations under the License.
12+
*/
13+
package org.web3j.protocol.besu.filters;
14+
15+
import java.io.IOException;
16+
import java.math.BigInteger;
17+
import java.util.Optional;
18+
19+
import org.web3j.protocol.besu.Besu;
20+
import org.web3j.protocol.core.Request;
21+
import org.web3j.protocol.core.filters.Callback;
22+
import org.web3j.protocol.core.filters.LogFilter;
23+
import org.web3j.protocol.core.methods.response.EthFilter;
24+
import org.web3j.protocol.core.methods.response.EthLog;
25+
import org.web3j.protocol.core.methods.response.EthUninstallFilter;
26+
import org.web3j.protocol.core.methods.response.Log;
27+
import org.web3j.utils.Numeric;
28+
29+
public class PrivateLogFilter extends LogFilter {
30+
31+
private final String privacyGroupId;
32+
33+
public PrivateLogFilter(
34+
Besu web3j,
35+
Callback<Log> callback,
36+
String privacyGroupId,
37+
org.web3j.protocol.core.methods.request.EthFilter ethFilter) {
38+
super(web3j, callback, ethFilter);
39+
this.privacyGroupId = privacyGroupId;
40+
}
41+
42+
@Override
43+
protected EthFilter sendRequest() throws IOException {
44+
return ((Besu) web3j).privNewFilter(privacyGroupId, ethFilter).send();
45+
}
46+
47+
@Override
48+
protected EthUninstallFilter uninstallFilter(BigInteger filterId) throws IOException {
49+
return ((Besu) web3j)
50+
.privUninstallFilter(privacyGroupId, Numeric.toHexStringWithPrefix(filterId))
51+
.send();
52+
}
53+
54+
@Override
55+
protected Optional<Request<?, EthLog>> getFilterLogs(BigInteger filterId) {
56+
return Optional.of(
57+
((Besu) web3j)
58+
.privGetFilterLogs(
59+
privacyGroupId, Numeric.toHexStringWithPrefix(filterId)));
60+
}
61+
}

core/src/main/java/org/web3j/protocol/core/filters/BlockFilter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ public BlockFilter(Web3j web3j, Callback<String> callback) {
3030
}
3131

3232
@Override
33-
EthFilter sendRequest() throws IOException {
33+
protected EthFilter sendRequest() throws IOException {
3434
return web3j.ethNewBlockFilter().send();
3535
}
3636

3737
@Override
38-
void process(List<EthLog.LogResult> logResults) {
38+
protected void process(List<EthLog.LogResult> logResults) {
3939
for (EthLog.LogResult logResult : logResults) {
4040
if (logResult instanceof EthLog.Hash) {
4141
String blockHash = ((EthLog.Hash) logResult).get();

core/src/main/java/org/web3j/protocol/core/filters/BlocksFilter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -32,12 +32,12 @@ public BlocksFilter(Web3j web3j, Callback<List<String>> callback) {
3232
}
3333

3434
@Override
35-
EthFilter sendRequest() throws IOException {
35+
protected EthFilter sendRequest() throws IOException {
3636
return web3j.ethNewBlockFilter().send();
3737
}
3838

3939
@Override
40-
void process(List<LogResult> logResults) {
40+
protected void process(List<LogResult> logResults) {
4141
List<String> blockHashes = new ArrayList<>(logResults.size());
4242

4343
for (EthLog.LogResult logResult : logResults) {

core/src/main/java/org/web3j/protocol/core/filters/Filter.java

+10-6
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ public abstract class Filter<T> {
3838

3939
private static final Logger log = LoggerFactory.getLogger(Filter.class);
4040

41-
final Web3j web3j;
42-
final Callback<T> callback;
41+
protected final Web3j web3j;
42+
protected Callback<T> callback;
4343

4444
private volatile BigInteger filterId;
4545

46-
private ScheduledFuture<?> schedule;
46+
protected ScheduledFuture<?> schedule;
4747

4848
private ScheduledExecutorService scheduledExecutorService;
4949

@@ -145,9 +145,9 @@ private void pollFilter(EthFilter ethFilter) {
145145
}
146146
}
147147

148-
abstract EthFilter sendRequest() throws IOException;
148+
protected abstract EthFilter sendRequest() throws IOException;
149149

150-
abstract void process(List<EthLog.LogResult> logResults);
150+
protected abstract void process(List<EthLog.LogResult> logResults);
151151

152152
private void reinstallFilter() {
153153
log.warn("The filter has not been found. Filter id: " + filterId);
@@ -159,7 +159,7 @@ public void cancel() {
159159
schedule.cancel(false);
160160

161161
try {
162-
EthUninstallFilter ethUninstallFilter = web3j.ethUninstallFilter(filterId).send();
162+
EthUninstallFilter ethUninstallFilter = uninstallFilter(filterId);
163163
if (ethUninstallFilter.hasError()) {
164164
throwException(ethUninstallFilter.getError());
165165
}
@@ -172,6 +172,10 @@ public void cancel() {
172172
}
173173
}
174174

175+
protected EthUninstallFilter uninstallFilter(BigInteger filterId) throws IOException {
176+
return web3j.ethUninstallFilter(filterId).send();
177+
}
178+
175179
/**
176180
* Retrieves historic filters for the filter with the given id. Getting historic logs is not
177181
* supported by all filters. If not the method should return an empty EthLog object

core/src/main/java/org/web3j/protocol/core/filters/LogFilter.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
/** Log filter handler. */
2727
public class LogFilter extends Filter<Log> {
2828

29-
private final org.web3j.protocol.core.methods.request.EthFilter ethFilter;
29+
protected final org.web3j.protocol.core.methods.request.EthFilter ethFilter;
3030

3131
public LogFilter(
3232
Web3j web3j,
@@ -37,12 +37,12 @@ public LogFilter(
3737
}
3838

3939
@Override
40-
EthFilter sendRequest() throws IOException {
40+
protected EthFilter sendRequest() throws IOException {
4141
return web3j.ethNewFilter(ethFilter).send();
4242
}
4343

4444
@Override
45-
void process(List<EthLog.LogResult> logResults) {
45+
protected void process(List<EthLog.LogResult> logResults) {
4646
for (EthLog.LogResult logResult : logResults) {
4747
if (logResult instanceof EthLog.LogObject) {
4848
Log log = ((EthLog.LogObject) logResult).get();

core/src/main/java/org/web3j/protocol/core/filters/LogsFilter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ public LogsFilter(
3939
}
4040

4141
@Override
42-
EthFilter sendRequest() throws IOException {
42+
protected EthFilter sendRequest() throws IOException {
4343
return web3j.ethNewFilter(ethFilter).send();
4444
}
4545

4646
@Override
47-
void process(List<LogResult> logResults) {
47+
protected void process(List<LogResult> logResults) {
4848
List<Log> logs = new ArrayList<>(logResults.size());
4949

5050
for (EthLog.LogResult logResult : logResults) {

core/src/main/java/org/web3j/protocol/core/filters/PendingTransactionFilter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,12 @@ public PendingTransactionFilter(Web3j web3j, Callback<String> callback) {
3030
}
3131

3232
@Override
33-
EthFilter sendRequest() throws IOException {
33+
protected EthFilter sendRequest() throws IOException {
3434
return web3j.ethNewPendingTransactionFilter().send();
3535
}
3636

3737
@Override
38-
void process(List<EthLog.LogResult> logResults) {
38+
protected void process(List<EthLog.LogResult> logResults) {
3939
for (EthLog.LogResult logResult : logResults) {
4040
if (logResult instanceof EthLog.Hash) {
4141
String transactionHash = ((EthLog.Hash) logResult).get();

core/src/main/java/org/web3j/protocol/core/filters/PendingTransactionsFilter.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ public PendingTransactionsFilter(Web3j web3j, Callback<List<String>> callback) {
3131
}
3232

3333
@Override
34-
EthFilter sendRequest() throws IOException {
34+
protected EthFilter sendRequest() throws IOException {
3535
return web3j.ethNewPendingTransactionFilter().send();
3636
}
3737

3838
@Override
39-
void process(List<EthLog.LogResult> logResults) {
39+
protected void process(List<EthLog.LogResult> logResults) {
4040
List<String> logs = new ArrayList<>(logResults.size());
4141

4242
for (EthLog.LogResult logResult : logResults) {

0 commit comments

Comments
 (0)