Skip to content

Commit beb4cc8

Browse files
committed
Add rep-synch functionality
The parent can now refresh/synch the child. Added GUI entry point into the rep/synch tool. The functionality is now more thread safe.
1 parent 5463da2 commit beb4cc8

63 files changed

Lines changed: 1939 additions & 2326 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/MongoDBRyaDAO.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ public synchronized void setConf(final StatefulMongoDBRdfConfiguration conf) {
8282
auths = conf.getAuthorizations();
8383
flushEachUpdate.set(conf.flushEachUpdate());
8484
}
85-
86-
85+
8786
public void setDB(final DB db) {
8887
this.db = db;
8988
}
@@ -107,7 +106,7 @@ public void init() throws RyaDAOException {
107106
index.setConf(conf);
108107
}
109108

110-
db = mongoClient.getDB(conf.get(MongoDBRdfConfiguration.MONGO_DB_NAME));
109+
db = mongoClient.getDB(conf.getRyaInstanceName());
111110
coll = db.getCollection(conf.getTriplesCollectionName());
112111
nameSpaceManager = new SimpleMongoDBNamespaceManager(db.getCollection(conf.getNameSpacesCollectionName()));
113112
queryEngine = new MongoDBQueryEngine();
@@ -307,4 +306,4 @@ private void flushIndexers() throws RyaDAOException {
307306
}
308307
}
309308
}
310-
}
309+
}

dao/mongodb.rya/src/main/java/org/apache/rya/mongodb/document/visibility/DocumentVisibilityAdapter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ public static DocumentVisibility toDocumentVisibility(final DBObject mongoObj) t
113113
documentVisibilityArray = (Object[]) documentVisibilityObject;
114114
} else if (documentVisibilityObject instanceof BasicDBList) {
115115
documentVisibilityArray = DocumentVisibilityUtil.convertBasicDBListToObjectArray((BasicDBList) documentVisibilityObject);
116+
} else {
117+
documentVisibilityArray = new String[] {""};
116118
}
117119

118120
final String documentVisibilityString = DocumentVisibilityUtil.multidimensionalArrayToBooleanString(documentVisibilityArray);

dao/mongodb.rya/src/test/java/org/apache/rya/mongodb/MongoRyaITBase.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,9 @@ protected void beforeTest() throws Exception {
4141
// Setup the configuration that will be used within the test.
4242
final MongoDBRdfConfiguration conf = new MongoDBRdfConfiguration( new Configuration() );
4343
conf.setBoolean("sc.useMongo", true);
44-
conf.setTablePrefix("test_");
45-
conf.setMongoDBName(conf.getRyaInstanceName());
46-
conf.setMongoHostname( super.getMongoHostname() );
47-
conf.setMongoPort("" + super.getMongoPort());
44+
conf.setRyaInstanceName("mongo_test");
45+
conf.setMongoHostname(getMongoHostname());
46+
conf.setMongoPort(getMongoPort() + "");
4847

4948
// Let tests update the configuration.
5049
updateConfiguration(conf);
@@ -77,4 +76,4 @@ public MongoCollection<Document> getRyaCollection() {
7776
public DBCollection getRyaDbCollection() {
7877
return getMongoClient().getDB(conf.getMongoDBName()).getCollection(conf.getTriplesCollectionName());
7978
}
80-
}
79+
}

extras/indexing/src/main/java/org/apache/rya/sail/config/RyaSailFactory.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -80,19 +80,25 @@ private static Sail getRyaSail(final Configuration config) throws InferenceEngin
8080

8181
final String user;
8282
final String pswd;
83-
// XXX Should(?) be MongoDBRdfConfiguration.MONGO_COLLECTION_PREFIX inside the if below. RYA-135
84-
final String ryaInstance = config.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
85-
Objects.requireNonNull(ryaInstance, "RyaInstance or table prefix is missing from configuration."+RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
83+
final String ryaInstance;
8684

8785
if(ConfigUtils.getUseMongo(config)) {
8886
// Get a reference to a Mongo DB configuration object.
89-
final MongoDBRdfConfiguration mongoConfig = (config instanceof MongoDBRdfConfiguration) ?
87+
final MongoDBRdfConfiguration mongoConfig = config instanceof MongoDBRdfConfiguration ?
9088
(MongoDBRdfConfiguration)config : new MongoDBRdfConfiguration(config);
89+
90+
ryaInstance = mongoConfig.getRyaInstanceName();
91+
92+
requireNonNull(ryaInstance, "RyaInstance is missing from configuration." + MongoDBRdfConfiguration.RYA_INSTANCE_NAME);
93+
9194
// Instantiate a Mongo client and Mongo DAO.
9295
dao = getMongoDAO(mongoConfig);
9396
// Then use the DAO's newly-created stateful conf in place of the original
9497
rdfConfig = dao.getConf();
9598
} else {
99+
ryaInstance = config.get(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
100+
Objects.requireNonNull(ryaInstance, "RyaInstance or table prefix is missing from configuration."+RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX);
101+
96102
rdfConfig = new AccumuloRdfConfiguration(config);
97103
user = rdfConfig.get(ConfigUtils.CLOUDBASE_USER);
98104
pswd = rdfConfig.get(ConfigUtils.CLOUDBASE_PASSWORD);
@@ -221,7 +227,7 @@ public static void updateAccumuloConfig(final AccumuloRdfConfiguration config, f
221227
* @return - MongoDBRyaDAO with Indexers configured according to user's specification
222228
* @throws RyaDAOException if the DAO can't be initialized
223229
*/
224-
public static MongoDBRyaDAO getMongoDAO(MongoDBRdfConfiguration mongoConfig) throws RyaDAOException {
230+
public static MongoDBRyaDAO getMongoDAO(final MongoDBRdfConfiguration mongoConfig) throws RyaDAOException {
225231
// Create the MongoClient that will be used by the Sail object's components.
226232
final MongoClient client = createMongoClient(mongoConfig);
227233

extras/rya.export/export.accumulo/pom.xml

Lines changed: 0 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -84,46 +84,4 @@ under the License.
8484
<artifactId>libthrift</artifactId>
8585
</dependency>
8686
</dependencies>
87-
<build>
88-
<plugins>
89-
<plugin>
90-
<groupId>org.codehaus.mojo</groupId>
91-
<artifactId>jaxb2-maven-plugin</artifactId>
92-
<executions>
93-
<execution>
94-
<id>xjc</id>
95-
<goals>
96-
<goal>xjc</goal>
97-
</goals>
98-
</execution>
99-
</executions>
100-
<configuration>
101-
<packageName>org.apache.rya.export</packageName>
102-
</configuration>
103-
</plugin>
104-
105-
<plugin>
106-
<groupId>com.mycila</groupId>
107-
<artifactId>license-maven-plugin</artifactId>
108-
<configuration>
109-
<header>${project.basedir}/src/license/header.txt</header>
110-
</configuration>
111-
<executions>
112-
<execution>
113-
<id>update-generated-source-headers</id>
114-
<configuration>
115-
<basedir>${project.build.directory}/generated-sources</basedir>
116-
<mapping>
117-
<sun-jaxb.episode>XML_STYLE</sun-jaxb.episode>
118-
</mapping>
119-
</configuration>
120-
<phase>process-sources</phase>
121-
<goals>
122-
<goal>format</goal>
123-
</goals>
124-
</execution>
125-
</executions>
126-
</plugin>
127-
</plugins>
128-
</build>
12987
</project>

extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/AccumuloRyaStatementStore.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.rya.export.api.store.UpdateStatementException;
5151
import org.eclipse.rdf4j.common.iteration.CloseableIteration;
5252

53-
import com.google.common.base.Function;
5453
import com.google.common.collect.Iterators;
5554

5655
/**
@@ -74,6 +73,7 @@ public class AccumuloRyaStatementStore implements RyaStatementStore {
7473
private final Set<IteratorSetting> iteratorSettings = new HashSet<>();
7574
private final AccumuloParentMetadataRepository metadataRepo;
7675

76+
private final String ryaInstanceName;
7777
/**
7878
* Creates a new instance of {@link AccumuloRyaStatementStore}.
7979
* @param dao the {@AccumuloRyaDAO}.
@@ -87,6 +87,7 @@ public AccumuloRyaStatementStore(final AccumuloRyaDAO dao, final String tablePre
8787
}
8888
accumuloRyaDao = dao;
8989
metadataRepo = new AccumuloParentMetadataRepository(dao);
90+
ryaInstanceName = ryaInstance;
9091
}
9192

9293
@Override
@@ -105,9 +106,7 @@ public Iterator<RyaStatement> fetchStatements() throws FetchStatementException {
105106
}
106107
// Convert Entry iterator to RyaStatement iterator
107108
final Iterator<Entry<Key, Value>> entryIter = scanner.iterator();
108-
final Iterator<RyaStatement> ryaStatementIter = Iterators.transform(entryIter, new Function<Entry<Key, Value>, RyaStatement>() {
109-
@Override
110-
public RyaStatement apply(final Entry<Key, Value> entry) {
109+
final Iterator<RyaStatement> ryaStatementIter = Iterators.transform(entryIter, entry -> {
111110
final Key key = entry.getKey();
112111
final Value value = entry.getValue();
113112
RyaStatement ryaStatement = null;
@@ -117,14 +116,19 @@ public RyaStatement apply(final Entry<Key, Value> entry) {
117116
log.error("Unable to convert the key/value pair into a Rya Statement", e);
118117
}
119118
return ryaStatement;
120-
}
121-
});
119+
});
122120
return ryaStatementIter;
123121
} catch (final Exception e) {
124122
throw new FetchStatementException("Failed to fetch statements.", e);
125123
}
126124
}
127125

126+
@Override
127+
public long count() {
128+
//accumulo cannot count.
129+
return -1;
130+
}
131+
128132
@Override
129133
public void addStatement(final RyaStatement statement) throws AddStatementException {
130134
try {
@@ -141,6 +145,16 @@ public void addStatement(final RyaStatement statement) throws AddStatementExcept
141145
}
142146
}
143147

148+
@Override
149+
public void addStatements(final Iterator<RyaStatement> statements) throws AddStatementException {
150+
try {
151+
accumuloRyaDao.add(statements);
152+
accumuloRyaDao.flush();
153+
} catch (final RyaDAOException e) {
154+
throw new AddStatementException("Unable to add the Rya Statement", e);
155+
}
156+
}
157+
144158
@Override
145159
public void removeStatement(final RyaStatement statement) throws RemoveStatementException {
146160
try {
@@ -216,4 +230,9 @@ public void addIterator(final IteratorSetting iteratorSetting) {
216230
checkNotNull(iteratorSetting);
217231
iteratorSettings.add(iteratorSetting);
218232
}
233+
234+
@Override
235+
public String getRyaInstanceName() {
236+
return ryaInstanceName;
237+
}
219238
}

extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/conf/AccumuloExportConstants.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.log4j.Logger;
2828
import org.apache.rya.accumulo.mr.MRUtils;
2929
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
30-
import org.apache.rya.export.InstanceType;
3130
import org.apache.rya.indexing.accumulo.ConfigUtils;
3231

3332
import com.google.common.collect.ImmutableList;
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.apache.rya.export.accumulo.conf;
2+
3+
public enum InstanceType {
4+
MOCK,
5+
MINI,
6+
DISTRIBUTION;
7+
}

extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/parent/AccumuloParentMetadataRepository.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ private MergeParentMetadata getMetadataFromTable() throws ParentMetadataDoesNotE
141141
// Fetch the metadata from the entries.
142142
String ryaInstanceName = null;
143143
Date timestamp = null;
144-
Date filterTimestamp = null;
144+
long filterTimestamp = -1L;
145145
Long parentTimeOffset = null;
146146

147147
while (entries.hasNext()) {
@@ -154,7 +154,7 @@ private MergeParentMetadata getMetadataFromTable() throws ParentMetadataDoesNotE
154154
} else if (columnQualifier.equals(MERGE_PARENT_METADATA_TIMESTAMP)) {
155155
timestamp = DATE_LEXICODER.decode(value);
156156
} else if (columnQualifier.equals(MERGE_PARENT_METADATA_FILTER_TIMESTAMP)) {
157-
filterTimestamp = DATE_LEXICODER.decode(value);
157+
filterTimestamp = LONG_LEXICODER.decode(value);
158158
} else if (columnQualifier.equals(MERGE_PARENT_METADATA_PARENT_TIME_OFFSET)) {
159159
parentTimeOffset = LONG_LEXICODER.decode(value);
160160
}
@@ -220,8 +220,8 @@ private static List<Mutation> makeWriteMetadataMutations(final MergeParentMetada
220220
mutations.add(timestampMutation);
221221

222222
// Filter Timestamp
223-
if (metadata.getFilterTimestamp() != null) {
224-
final Mutation filterTimestampMutation = makeFieldMutation(metadata.getFilterTimestamp(), DATE_LEXICODER, MERGE_PARENT_METADATA_FILTER_TIMESTAMP);
223+
if (metadata.getFilterTimestamp() != -1L) {
224+
final Mutation filterTimestampMutation = makeFieldMutation(metadata.getFilterTimestamp(), LONG_LEXICODER, MERGE_PARENT_METADATA_FILTER_TIMESTAMP);
225225
mutations.add(filterTimestampMutation);
226226
}
227227

extras/rya.export/export.accumulo/src/main/java/org/apache/rya/export/accumulo/policy/TimestampPolicyAccumuloRyaStatementStore.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@
1818
*/
1919
package org.apache.rya.export.accumulo.policy;
2020

21-
import java.util.Date;
2221
import java.util.Iterator;
2322

2423
import org.apache.accumulo.core.client.IteratorSetting;
2524
import org.apache.accumulo.core.iterators.user.TimestampFilter;
2625
import org.apache.rya.api.domain.RyaStatement;
2726
import org.apache.rya.export.accumulo.AccumuloRyaStatementStore;
28-
import org.apache.rya.export.api.conf.policy.TimestampPolicyStatementStore;
27+
import org.apache.rya.export.api.policy.TimestampPolicyStatementStore;
2928
import org.apache.rya.export.api.store.FetchStatementException;
3029
import org.apache.rya.export.api.store.RyaStatementStore;
3130

@@ -34,31 +33,32 @@
3433
* filter statements based on a timestamp.
3534
*/
3635
public class TimestampPolicyAccumuloRyaStatementStore extends TimestampPolicyStatementStore {
37-
36+
//an instance is held onto to be able to add iterators to.
37+
private final AccumuloRyaStatementStore store;
3838
/**
3939
* Creates a new {@link TimestampPolicyAccumuloRyaStatementStore}
4040
* @param store
41-
* @param timestamp
4241
*/
43-
public TimestampPolicyAccumuloRyaStatementStore(final AccumuloRyaStatementStore store, final Date timestamp) {
42+
public TimestampPolicyAccumuloRyaStatementStore(final AccumuloRyaStatementStore store, final long timestamp) {
4443
super(store, timestamp);
45-
store.addIterator(getStartTimeSetting(timestamp));
44+
this.store = store;
4645
}
4746

4847
/**
4948
* Creates an {@link IteratorSetting} with a time stamp filter that starts with the specified data.
5049
* @param time the start time of the filter.
5150
* @return the {@link IteratorSetting}.
5251
*/
53-
private static IteratorSetting getStartTimeSetting(final Date time) {
52+
private IteratorSetting getStartTimeSetting() {
5453
final IteratorSetting setting = new IteratorSetting(1, "startTimeIterator", TimestampFilter.class);
55-
TimestampFilter.setStart(setting, time.getTime(), true);
54+
TimestampFilter.setStart(setting, timestamp, true);
5655
TimestampFilter.setEnd(setting, Long.MAX_VALUE, true);
5756
return setting;
5857
}
5958

6059
@Override
6160
public Iterator<RyaStatement> fetchStatements() throws FetchStatementException {
61+
store.addIterator(getStartTimeSetting());
6262
return store.fetchStatements();
6363
}
6464
}

0 commit comments

Comments
 (0)