From 4bc8494bbc881259a29b523cac50f5a36013a929 Mon Sep 17 00:00:00 2001 From: bern-SOPTIM Date: Wed, 3 Jun 2026 00:23:44 +0200 Subject: [PATCH] GH-3969: Native stream() implementations for DatasetGraph Replace the find()-wrapping default of DatasetGraph.stream(g,s,p,o) with first-class stream support across the hierarchy. DatasetGraphBaseFind gets a stream() path mirroring the find() default-/named-/union-graph split, backed by streamInDftGraph / streamInSpecificNamedGraph / streamInAnyNamedGraphs primitives implemented per dataset. GraphView streams graph-over-dataset through DatasetGraph#stream; adds G stream helpers. The interface method stays default, so implementors are not broken. Includes stream()==find() parity tests and a forEachRemaining check in IteratorTxnTracker so stream bulk operations stay inside their transaction. --- .../riot/writer/c14n/DatasetGraphOrdered.java | 11 ++ .../sparql/core/DatasetGraphBaseFind.java | 47 +++++++- .../sparql/core/DatasetGraphCollection.java | 20 ++++ .../sparql/core/DatasetGraphFilteredView.java | 15 +++ .../jena/sparql/core/DatasetGraphMap.java | 18 +++ .../jena/sparql/core/DatasetGraphNull.java | 16 +++ .../jena/sparql/core/DatasetGraphOne.java | 18 +++ .../jena/sparql/core/DatasetGraphWrapper.java | 9 ++ .../apache/jena/sparql/core/GraphView.java | 19 ++++ .../sparql/core/mem/DatasetGraphInMemory.java | 33 ++++++ .../jena/sparql/util/DyadicDatasetGraph.java | 15 +++ .../main/java/org/apache/jena/system/G.java | 19 ++++ .../buffering/BufferingDatasetGraph.java | 52 +++++++++ .../sparql/core/AbstractDatasetGraphFind.java | 105 ++++++++++++++++++ .../AbstractTestGraphOverDatasetGraph.java | 27 +++++ .../sparql/core/DatasetGraphSimpleMem.java | 42 +++++-- .../core/TestDatasetGraphFilteredView.java | 32 ++++++ .../jena/sparql/core/TestSpecialDatasets.java | 46 ++++++++ .../org/apache/jena/system/TestG_Quad.java | 15 +++ .../org/apache/jena/system/TestG_Triple.java | 9 ++ .../buffering/TestBufferingDatasetGraph.java | 66 ++++++++++- .../storage/system/DatasetGraphStorage.java | 19 +++- .../transaction/txn/IteratorTxnTracker.java | 7 ++ .../jena/tdb1/store/DatasetGraphTDB.java | 13 +++ .../jena/tdb2/store/GraphViewSwitchable.java | 7 ++ 25 files changed, 667 insertions(+), 13 deletions(-) diff --git a/jena-arq/src/main/java/org/apache/jena/riot/writer/c14n/DatasetGraphOrdered.java b/jena-arq/src/main/java/org/apache/jena/riot/writer/c14n/DatasetGraphOrdered.java index 4051d5c7b29..465357eb310 100644 --- a/jena-arq/src/main/java/org/apache/jena/riot/writer/c14n/DatasetGraphOrdered.java +++ b/jena-arq/src/main/java/org/apache/jena/riot/writer/c14n/DatasetGraphOrdered.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.LinkedHashSet; +import java.util.stream.Stream; import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.graph.Graph; @@ -65,6 +66,16 @@ public Iterator findNG(Node g, Node s, Node p, Node o) { return iterator; } + @Override + public Stream stream(Node g, Node s, Node p, Node o) { + return quads.stream().filter(quad -> matches(quad, g, s, p, o)); + } + + @Override + public Stream stream() { + return quads.stream(); + } + @Override public void add(Quad quad) { quads.add(quad); diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphBaseFind.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphBaseFind.java index 10850be8375..9692e8c6895 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphBaseFind.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphBaseFind.java @@ -82,12 +82,46 @@ protected Iterator findAny(Node s, Node p, Node o) { return Iter.append(iter1, iter2); } + @Override + public Stream stream(Node g, Node s, Node p, Node o) { + if ( Quad.isDefaultGraph(g)) + return streamInDftGraph(s, p, o) ; + if ( ! isWildcard(g) ) + return streamNG(g, s, p, o) ; + return streamAny(s, p, o) ; + } + + /** Stream equivalent of {@link #findNG(Node, Node, Node, Node)}. */ + public Stream streamNG(Node g, Node s, Node p , Node o) { + Stream stream ; + if ( Quad.isUnionGraph(g)) + stream = streamQuadsInUnionGraph(s, p, o) ; + else if ( isWildcard(g) ) + stream = streamInAnyNamedGraphs(s, p, o) ; + else if ( Quad.isDefaultGraph(g) ) + stream = streamInDftGraph(s, p, o) ; + else + // Not wildcard, not union graph, not default graph. + stream = streamInSpecificNamedGraph(g, s, p, o) ; + if ( stream == null ) + return Stream.empty() ; + return stream ; + } + + protected Stream streamAny(Node s, Node p, Node o) { + return Stream.concat(streamInDftGraph(s, p, o), streamInAnyNamedGraphs(s, p, o)) ; + } + /** Find matches in the default graph. * Return as quads; the default graph is {@link Quad#defaultGraphIRI} * To get Triples, use {@code DatasetGraph.getDefaultGraph().find(...)}. */ protected abstract Iterator findInDftGraph(Node s, Node p , Node o) ; + /** Stream equivalent of {@link #findInDftGraph}. + */ + protected abstract Stream streamInDftGraph(Node s, Node p, Node o) ; + /** Find matches in the notional union of all named graphs - return as triples. * No duplicates - the union graph is a set of triples. * See {@link #findInAnyNamedGraphs}, where there may be duplicates. @@ -110,6 +144,11 @@ public Iterator findQuadsInUnionGraph(Node s, Node p , Node o) { return findUnionGraphTriples(s,p,o).map(t -> Quad.create(Quad.unionGraph, t)).iterator() ; } + /** Stream equivalent of {@link #findQuadsInUnionGraph}. */ + public Stream streamQuadsInUnionGraph(Node s, Node p , Node o) { + return findUnionGraphTriples(s,p,o).map(t -> Quad.create(Quad.unionGraph, t)) ; + } + /** Find matches in the notional union of all named graphs - return as triples. * No duplicates - the union graph is a set of triples. * See {@link #findInAnyNamedGraphs}, where there may be duplicates. @@ -118,15 +157,21 @@ public Iterator findQuadsInUnionGraph(Node s, Node p , Node o) { * may be possible to avoid "distinct". */ private Stream findUnionGraphTriples(Node s, Node p , Node o) { - return Iter.asStream(findInAnyNamedGraphs(s,p,o)).map(Quad::asTriple).distinct() ; + return streamInAnyNamedGraphs(s,p,o).map(Quad::asTriple).distinct() ; } /** Find in a specific named graph - {@code g} is a ground term (IRI or bNode), not a wild card (or null). */ protected abstract Iterator findInSpecificNamedGraph(Node g, Node s, Node p , Node o) ; + /** Stream equivalent of {@link #findInSpecificNamedGraph}. */ + protected abstract Stream streamInSpecificNamedGraph(Node g, Node s, Node p, Node o) ; + /** Find in any named graph - return quads. * If a triple matches in two different graph, return a quad for each. * See {@link #findInUnionGraph} for matching without duplicate triples. */ protected abstract Iterator findInAnyNamedGraphs(Node s, Node p , Node o) ; + + /** Stream equivalent of {@link #findInAnyNamedGraphs}. */ + protected abstract Stream streamInAnyNamedGraphs(Node s, Node p, Node o) ; } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphCollection.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphCollection.java index 56bd86ba878..ea6fc4eddaa 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphCollection.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphCollection.java @@ -24,6 +24,7 @@ import java.util.Iterator ; import java.util.List ; import java.util.Objects ; +import java.util.stream.Stream; import org.apache.jena.atlas.iterator.Iter ; import org.apache.jena.atlas.iterator.IteratorConcat ; @@ -63,6 +64,11 @@ protected Iterator findInDftGraph(Node s, Node p , Node o) return G.triples2quadsDftGraph(getDefaultGraph().find(s, p, o)) ; } + @Override + protected Stream streamInDftGraph(Node s, Node p, Node o) { + return G.triples2quadsDftGraph(getDefaultGraph().stream(s, p, o)) ; + } + @Override protected Iter findInSpecificNamedGraph(Node g, Node s, Node p , Node o) { @@ -72,6 +78,14 @@ protected Iter findInSpecificNamedGraph(Node g, Node s, Node p , Node o) return G.triples2quads(g, graph.find(s, p, o)) ; } + @Override + protected Stream streamInSpecificNamedGraph(Node g, Node s, Node p, Node o) { + Graph graph = fetchGraph(g) ; + if ( graph == null ) + return Stream.empty() ; + return G.triples2quads(g, graph.stream(s, p, o)) ; + } + @Override protected Iterator findInAnyNamedGraphs(Node s, Node p, Node o) { @@ -89,6 +103,12 @@ protected Iterator findInAnyNamedGraphs(Node s, Node p, Node o) return iter ; } + @Override + protected Stream streamInAnyNamedGraphs(Node s, Node p, Node o) { + return Iter.asStream(listGraphNodes()) + .flatMap(gn -> streamInSpecificNamedGraph(gn, s, p, o)); + } + @Override public abstract Iterator listGraphNodes() ; diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphFilteredView.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphFilteredView.java index 2fd97212e8e..9756b80c2ba 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphFilteredView.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphFilteredView.java @@ -25,6 +25,7 @@ import java.util.HashSet; import java.util.Iterator; import java.util.function.Predicate; +import java.util.stream.Stream; import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.atlas.logging.Log; @@ -90,6 +91,12 @@ private Iterator filter(Iterator iter) { return iter; return Iter.filter(iter, this::filter); } + + private Stream filter(Stream stream) { + if ( this.quadFilter == null ) + return stream; + return stream.filter(this::filter); + } // Need to intercept these because otherwise that are a GraphView of the wrapped "dsg", not this one. @@ -149,6 +156,14 @@ public Iterator find() { return filter(super.find(g, s, p, o)).hasNext(); } + @Override public Stream stream(Node g, Node s, Node p, Node o) { + return filter(super.stream(g, s, p, o)); + } + + @Override public Stream stream() { + return filter(super.stream()); + } + @Override public boolean contains(Quad quad) { return filter(super.find(quad)).hasNext(); } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphMap.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphMap.java index 40f390dc1c1..1e1320a994d 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphMap.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphMap.java @@ -26,7 +26,9 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Objects; +import java.util.stream.Stream; +import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.atlas.iterator.IteratorConcat; import org.apache.jena.graph.Graph; import org.apache.jena.graph.Node; @@ -157,12 +159,22 @@ protected Iterator findInDftGraph(Node s, Node p, Node o) { return G.triples2quadsDftGraph(iter) ; } + @Override + protected Stream streamInDftGraph(Node s, Node p, Node o) { + return G.triples2quadsDftGraph(getDefaultGraph().stream(s, p, o)); + } + @Override protected Iterator findInSpecificNamedGraph(Node g, Node s, Node p, Node o) { Iterator iter = getGraph(g).find(s, p, o); return G.triples2quads(g, iter); } + @Override + protected Stream streamInSpecificNamedGraph(Node g, Node s, Node p, Node o) { + return G.triples2quads(g, getGraph(g).stream(s, p, o)); + } + @Override protected Iterator findInAnyNamedGraphs(Node s, Node p, Node o) { Iterator gnames = listGraphNodes(); @@ -178,6 +190,12 @@ protected Iterator findInAnyNamedGraphs(Node s, Node p, Node o) { return iter; } + @Override + protected Stream streamInAnyNamedGraphs(Node s, Node p, Node o) { + return Iter.asStream(listGraphNodes()) + .flatMap(gn -> streamInSpecificNamedGraph(gn, s, p, o)); + } + @Override public Graph getDefaultGraph() { return defaultGraph; diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphNull.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphNull.java index 55b97497f86..5d0bb4a4466 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphNull.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphNull.java @@ -22,6 +22,7 @@ package org.apache.jena.sparql.core; import java.util.Iterator; +import java.util.stream.Stream; import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.graph.Graph; @@ -73,16 +74,31 @@ protected Iterator findInDftGraph(Node s, Node p, Node o) { return Iter.nullIterator(); } + @Override + protected Stream streamInDftGraph(Node s, Node p, Node o) { + return Stream.empty(); + } + @Override protected Iterator findInSpecificNamedGraph(Node g, Node s, Node p, Node o) { return Iter.nullIterator(); } + @Override + protected Stream streamInSpecificNamedGraph(Node g, Node s, Node p, Node o) { + return Stream.empty(); + } + @Override protected Iterator findInAnyNamedGraphs(Node s, Node p, Node o) { return Iter.nullIterator(); } + @Override + protected Stream streamInAnyNamedGraphs(Node s, Node p, Node o) { + return Stream.empty(); + } + @Override public Graph getDefaultGraph() { return dftGraph; diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphOne.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphOne.java index 4d1cada1fc0..a2b26472e12 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphOne.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphOne.java @@ -22,6 +22,7 @@ package org.apache.jena.sparql.core; import java.util.Iterator; +import java.util.stream.Stream; import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.graph.Graph; @@ -202,18 +203,35 @@ protected Iterator findInDftGraph(Node s, Node p, Node o) { return G.triples2quadsDftGraph(graph.find(s, p, o)); } + @Override + protected Stream streamInDftGraph(Node s, Node p, Node o) { + return G.triples2quadsDftGraph(graph.stream(s, p, o)); + } + @Override protected Iterator findInSpecificNamedGraph(Node g, Node s, Node p, Node o) { // There are no named graphs return Iter.nullIterator(); } + @Override + protected Stream streamInSpecificNamedGraph(Node g, Node s, Node p, Node o) { + // There are no named graphs + return Stream.empty(); + } + @Override protected Iterator findInAnyNamedGraphs(Node s, Node p, Node o) { // There are no named graphs return Iter.nullIterator(); } + @Override + protected Stream streamInAnyNamedGraphs(Node s, Node p, Node o) { + // There are no named graphs + return Stream.empty(); + } + protected static boolean isDefaultGraph(Quad quad) { return isDefaultGraph(quad.getGraph()); } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphWrapper.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphWrapper.java index 2ceb5054e58..3482012b2a1 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphWrapper.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/DatasetGraphWrapper.java @@ -22,6 +22,7 @@ package org.apache.jena.sparql.core; import java.util.Iterator; +import java.util.stream.Stream; import org.apache.jena.atlas.lib.Sync; import org.apache.jena.graph.Graph; @@ -211,6 +212,14 @@ public Iterator find(Node g, Node s, Node p, Node o) public Iterator findNG(Node g, Node s, Node p, Node o) { return getR().findNG(g, s, p, o); } + @Override + public Stream stream(Node g, Node s, Node p, Node o) + { return getR().stream(g, s, p, o); } + + @Override + public Stream stream() + { return getR().stream(); } + @Override public boolean contains(Quad quad) { return getR().contains(quad); } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/GraphView.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/GraphView.java index 18193060828..a28d6ccb5a3 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/GraphView.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/GraphView.java @@ -22,6 +22,7 @@ package org.apache.jena.sparql.core; import java.util.Iterator ; +import java.util.stream.Stream; import org.apache.jena.atlas.iterator.Iter ; import org.apache.jena.atlas.lib.Sync ; @@ -125,6 +126,24 @@ protected ExtendedIterator graphBaseFind(Node s, Node p, Node o) { return WrappedIterator.createNoRemove(iter) ; } + /** + * Stream equivalent of {@link #graphBaseFind} / {@link #graphUnionFind}: route through DatasetGraph#stream + * so stream-native dataset (e.g. an in-memory one) stays on streams rather than iterator-wrapping. + * @param s subject match + * @param p predicate match + * @param o object match + * @return stream of matching triples + */ + @Override + public Stream stream(Node s, Node p, Node o) { + Node g = graphNode(graphName) ; + Stream stream = G.quads2triples(dsg.stream(g, s, p, o)) ; + if ( Quad.isUnionGraph(graphName) ) + // Suppress duplicates after projecting to triples. + stream = stream.distinct() ; + return stream ; + } + private static Node graphNode(Node gn) { return ( gn == null ) ? Quad.defaultGraphNodeGenerated : gn ; } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/mem/DatasetGraphInMemory.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/mem/DatasetGraphInMemory.java index 8f490660bdb..f36bc605b13 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/mem/DatasetGraphInMemory.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/mem/DatasetGraphInMemory.java @@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantLock ; import java.util.function.Consumer; import java.util.function.Supplier; +import java.util.stream.Stream; import org.apache.jena.atlas.lib.InternalErrorException ; import org.apache.jena.graph.Graph; @@ -321,6 +322,11 @@ private Iterator quadsFinder(final Node g, final Node s, final Node p, fin return quadsIndex().find(g, s, p, o).iterator(); } + private Stream quadsStreamer(final Node g, final Node s, final Node p, final Node o) { + if (isUnionGraph(g)) return streamInUnionGraph$(s, p, o); + return quadsIndex().find(g, s, p, o); + } + /** * Union graph is the merge of named graphs. */ @@ -329,10 +335,22 @@ private Iterator quadsFinder(final Node g, final Node s, final Node p, fin return access(() -> quadsIndex().findInUnionGraph(s, p, o).iterator()); } + /** + * Union graph is the merge of named graphs. + */ + // Temp - Should this be replaced by DatasetGraphBaseFind code? + private Stream streamInUnionGraph$(final Node s, final Node p, final Node o) { + return access(() -> quadsIndex().findInUnionGraph(s, p, o)); + } + private Iterator triplesFinder(final Node s, final Node p, final Node o) { return G.triples2quadsDftGraph(defaultGraph().find(s, p, o).iterator()); } + private Stream triplesStreamer(final Node s, final Node p, final Node o) { + return G.triples2quadsDftGraph(defaultGraph().find(s, p, o)); + } + @Override public Graph getGraph(final Node graphNode) { return GraphView.createNamedGraph(this, graphNode); @@ -436,13 +454,28 @@ protected Iterator findInDftGraph(final Node s, final Node p, final Node o return access(() -> triplesFinder(s, p, o)); } + @Override + protected Stream streamInDftGraph(Node s, Node p, Node o) { + return access(() -> triplesStreamer(s, p, o)); + } + @Override protected Iterator findInSpecificNamedGraph(final Node g, final Node s, final Node p, final Node o) { return access(() -> quadsFinder(g, s, p, o)); } + @Override + protected Stream streamInSpecificNamedGraph(Node g, Node s, Node p, Node o) { + return access(() -> quadsStreamer(g, s, p, o)); + } + @Override protected Iterator findInAnyNamedGraphs(final Node s, final Node p, final Node o) { return findInSpecificNamedGraph(ANY, s, p, o); } + + @Override + protected Stream streamInAnyNamedGraphs(Node s, Node p, Node o) { + return streamInSpecificNamedGraph(ANY, s, p, o); + } } diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/util/DyadicDatasetGraph.java b/jena-arq/src/main/java/org/apache/jena/sparql/util/DyadicDatasetGraph.java index a8d2a114229..d05602e83ae 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/util/DyadicDatasetGraph.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/util/DyadicDatasetGraph.java @@ -30,6 +30,7 @@ import java.util.Iterator; import java.util.Objects; +import java.util.stream.Stream; import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.atlas.lib.PairOfSameType; @@ -207,6 +208,20 @@ protected Iterator findInOneGraph(Node g, Node s, Node p, Node o) { return G.triples2quads(g, getGraph(g).find(s, p, o)); } + @Override + public Stream stream(Node g, Node s, Node p, Node o) { + if ( g.isConcrete() ) + return streamInOneGraph(g, s, p, o); + return Stream.concat( + Iter.asStream(listGraphNodes()).flatMap(gn -> streamInOneGraph(gn, s, p, o)), + streamInOneGraph(defaultGraphIRI, s, p, o) + ); + } + + protected Stream streamInOneGraph(Node g, Node s, Node p, Node o) { + return G.triples2quads(g, getGraph(g).stream(s, p, o)); + } + @Override public Graph getUnionGraph() { return new MultiUnion(map(listGraphNodes(), this::getGraph)); diff --git a/jena-arq/src/main/java/org/apache/jena/system/G.java b/jena-arq/src/main/java/org/apache/jena/system/G.java index e9c1cab044e..538f41bd333 100644 --- a/jena-arq/src/main/java/org/apache/jena/system/G.java +++ b/jena-arq/src/main/java/org/apache/jena/system/G.java @@ -28,6 +28,7 @@ import java.util.function.Consumer; import java.util.function.Predicate; import java.util.function.Supplier; +import java.util.stream.Stream; import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.atlas.lib.Copyable; @@ -792,6 +793,10 @@ private static Triple first(ExtendedIterator iter) { public static Iter quads2triples(Iterator iter) { return Iter.iter(iter).map(Quad::asTriple); } + /** Project quads to triples */ + public static Stream quads2triples(Stream stream) + { return stream.map(Quad::asTriple); } + /** Project quad to graph name */ public static Iterator quad2graphName(Iterator iter) { return Iter.map(iter, Quad::getGraph); } @@ -897,6 +902,11 @@ public static Iter triples2quads(Node graphNode, Iterator iter) { return Iter.iter(iter).map(t -> Quad.create(graphNode, t)); } + /** Convert a stream of triples into quads for the specified graph name. */ + public static Stream triples2quads(Node graphNode, Stream stream) { + return stream.map(t -> Quad.create(graphNode, t)); + } + /** * Convert an iterator of triples into quads for the default graph. This is * {@link Quad#defaultGraphIRI}, not {@link Quad#defaultGraphNodeGenerated}, which is @@ -906,6 +916,15 @@ public static Iter triples2quadsDftGraph(Iterator iter) { return triples2quads(Quad.defaultGraphIRI, iter); } + /** + * Convert a stream of triples into quads for the default graph. This is + * {@link Quad#defaultGraphIRI}, not {@link Quad#defaultGraphNodeGenerated}, which is + * for quads outside a dataset, usually the output of parsers. + */ + public static Stream triples2quadsDftGraph(Stream stream) { + return triples2quads(Quad.defaultGraphIRI, stream); + } + /** * Execute a graph transaction if the graph supports transactions else apply * without a transaction wrapper. diff --git a/jena-arq/src/main/java/org/apache/jena/system/buffering/BufferingDatasetGraph.java b/jena-arq/src/main/java/org/apache/jena/system/buffering/BufferingDatasetGraph.java index abe42c46c2b..16f6087f639 100644 --- a/jena-arq/src/main/java/org/apache/jena/system/buffering/BufferingDatasetGraph.java +++ b/jena-arq/src/main/java/org/apache/jena/system/buffering/BufferingDatasetGraph.java @@ -27,6 +27,7 @@ import java.util.Iterator; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.graph.Graph; @@ -321,24 +322,70 @@ protected Iterator findInDftGraph(Node s, Node p, Node o) { return iter; } + @Override + protected Stream streamInDftGraph(Node s, Node p, Node o) { + readOperation(); + DatasetGraph base = get(); + Stream extra = streamInAddedTriples(s, p, o); + Stream stream = + Stream.concat( + base.stream(Quad.defaultGraphIRI, s, p, o) + .filter(q->! deletedQuads.contains(q)), + extra); + if ( ! UNIQUE ) + stream = stream.distinct(); + return stream; + } + private Iterator findInAddedTriples(Node s, Node p, Node o) { return Iter.iter(addedTriples.iterator()) .filter(t->match(t,s,p,o)) .map(t->Quad.create(Quad.defaultGraphIRI,t)); } + private Stream streamInAddedTriples(Node s, Node p, Node o) { + return addedTriples.stream() + .filter(t->match(t,s,p,o)) + .map(t->Quad.create(Quad.defaultGraphIRI,t)); + } + @Override protected Iterator findInSpecificNamedGraph(Node g, Node s, Node p, Node o) { readOperation(); return findQuads(g, s, p, o); } + @Override + protected Stream streamInSpecificNamedGraph(Node g, Node s, Node p, Node o) { + readOperation(); + return streamQuads(g, s, p, o); + } + @Override protected Iterator findInAnyNamedGraphs(Node s, Node p, Node o) { readOperation(); return findQuads(Node.ANY, s, p, o); } + @Override + protected Stream streamInAnyNamedGraphs(Node s, Node p, Node o) { + readOperation(); + return streamQuads(Node.ANY, s, p, o); + } + + private Stream streamQuads(Node g, Node s, Node p, Node o) { + DatasetGraph base = get(); + Stream extra = streamInAddedQuads(g, s, p, o); + Stream stream = + Stream.concat( + base.stream(g, s, p, o) + .filter(q->! deletedQuads.contains(q)), + extra); + if ( ! UNIQUE ) + stream = stream.distinct(); + return stream; + } + private Iterator findQuads(Node g, Node s, Node p, Node o) { DatasetGraph base = get(); Iterator extra = findInAddedQuads(g, s, p, o); @@ -356,6 +403,11 @@ private Iterator findInAddedQuads(Node g, Node s, Node p, Node o) { .filter(t->match(t,g,s,p,o)); } + private Stream streamInAddedQuads(Node g, Node s, Node p, Node o) { + return addedQuads.stream() + .filter(t-> match(t,g,s,p,o)); + } + // Graphs: read/write operations will come back to the dataset. @Override public Graph getDefaultGraph() { diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/AbstractDatasetGraphFind.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/AbstractDatasetGraphFind.java index 1fa65b54fd8..f189bf26d54 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/AbstractDatasetGraphFind.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/AbstractDatasetGraphFind.java @@ -286,6 +286,111 @@ public void find_union_05() { assertTrue(x.contains(q5)); } + // stream(g,s,p,o) must route to the same primitives as find(g,s,p,o) and yield the same + // results, regardless of whether the implementation is stream-native, graph-backed, or + // iterator-native (bridging default). These mirror the find_* tests above. + + @Test public void stream_specific_01() { + List x = dsg.stream(g1, null, null, null).toList(); + assertEquals(2, x.size()); + assertTrue(x.contains(q4)); + assertTrue(x.contains(q3)); + } + + @Test public void stream_specific_02() { + List x = dsg.stream(g1, null, null, NodeConst.nodeOne).toList(); + assertEquals(1, x.size()); + assertTrue(x.contains(q4)); + } + + @Test public void stream_dft_01() { + List x = dsg.stream(Quad.defaultGraphIRI, null, null, null).toList(); + assertEquals(2, x.size()); + assertTrue(x.contains(q1)); + assertTrue(x.contains(q2)); + } + + @Test public void stream_dft_02() { + List x = dsg.stream(Quad.defaultGraphIRI, null, null, NodeConst.nodeOne).toList(); + assertEquals(0, x.size()); + } + + @Test public void stream_dft_03() { + List x = dsg.stream(Quad.defaultGraphIRI, null, null, NodeConst.nodeZero).toList(); + assertEquals(1, x.size()); + assertTrue(x.contains(q2)); + } + + @Test public void stream_union_01() { + List x = dsg.stream(Quad.unionGraph, null, null, null).toList(); + assertEquals(3, x.size()); + assertTrue(x.stream().allMatch(q->q.getGraph().equals(Quad.unionGraph))); + List z = x.stream().map(Quad::asTriple).toList(); + assertTrue(z.contains(q4.asTriple())); + assertTrue(z.contains(q5.asTriple())); + assertTrue(x.contains(Quad.create(Quad.unionGraph, q4.asTriple()))); + assertFalse(x.contains(Quad.create(Quad.unionGraph, q2.asTriple()))); + } + + /** stream(g,s,p,o) and find(g,s,p,o) must agree for every access pattern. */ + @Test public void stream_matches_find() { + Node[][] patterns = { + { null, null, null, null }, // findAny / streamAny + { Quad.defaultGraphIRI, null, null, null }, // default graph + { g1, null, null, null }, // specific named graph + { Quad.unionGraph, null, null, null }, // union graph (distinct) + { null, s, p, o }, // pattern across all + { null, null, null, NodeConst.nodeOne }, + { Quad.unionGraph, null, null, o }, // union graph, with object + }; + for ( Node[] pat : patterns ) { + List viaFind = toList(dsg.find(pat[0], pat[1], pat[2], pat[3])); + List viaStream = dsg.stream(pat[0], pat[1], pat[2], pat[3]).toList(); + assertEqualsUnordered(viaFind, viaStream); + } + } + + // DatasetGraphBaseFind specific: the stream-first union helpers and streamNG. + + @Test public void stream_dsgFind_union_02() { + assumeTrue(dsg instanceof DatasetGraphBaseFind, ()->"Not a DatasetGraphBaseFind"); + DatasetGraphBaseFind dsgx = (DatasetGraphBaseFind)dsg; + List x = dsgx.streamQuadsInUnionGraph(null, null, null).map(Quad::asTriple).toList(); + assertEquals(3, x.size()); + assertTrue(x.contains(q4.asTriple())); + assertTrue(x.contains(q5.asTriple())); + assertTrue(x.contains(q10.asTriple())); + } + + @Test public void stream_dsgFind_union_03() { + assumeTrue(dsg instanceof DatasetGraphBaseFind, ()->"Not a DatasetGraphBaseFind"); + DatasetGraphBaseFind dsgx = (DatasetGraphBaseFind)dsg; + List x1 = dsgx.streamQuadsInUnionGraph(null, null, null).map(Quad::asTriple).toList(); + List x2 = toList(dsgx.findInUnionGraph(null, null, null)); + assertEqualsUnordered(x1, x2); + List x3 = dsgx.streamQuadsInUnionGraph(null, null, null).toList(); + assertEquals(3, x3.size()); + assertTrue(x3.stream().allMatch(q->q.getGraph().equals(Quad.unionGraph))); + } + + @Test public void stream_dsgFind_union_05() { + assumeTrue(dsg instanceof DatasetGraphBaseFind, ()->"Not a DatasetGraphBaseFind"); + DatasetGraphBaseFind dsgx = (DatasetGraphBaseFind)dsg; + List x1 = dsgx.streamQuadsInUnionGraph(null, null, o).map(Quad::asTriple).toList(); + List x2 = quadsToDistinctTriples(dsg.find(Quad.unionGraph, null, null, o)); + assertEqualsUnordered(x1, x2); + assertEquals(1, x2.size()); + } + + @Test public void stream_dsgFind_ng() { + assumeTrue(dsg instanceof DatasetGraphBaseFind, ()->"Not a DatasetGraphBaseFind"); + DatasetGraphBaseFind dsgx = (DatasetGraphBaseFind)dsg; + assertEqualsUnordered(toList(dsgx.findNG(null, null, null, null)), + dsgx.streamNG(null, null, null, null).toList()); + assertEqualsUnordered(toList(dsgx.findNG(null, s, p, o)), + dsgx.streamNG(null, s, p, o).toList()); + } + public static void assertEqualsUnordered(List list1, List list2) { if ( ! ListUtils.equalsUnordered(list1, list2) ) fail(msg(null, list1, list2)); diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/AbstractTestGraphOverDatasetGraph.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/AbstractTestGraphOverDatasetGraph.java index 1f35694ed1d..c0f7f43beeb 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/AbstractTestGraphOverDatasetGraph.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/AbstractTestGraphOverDatasetGraph.java @@ -26,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Iterator; +import java.util.stream.Collectors; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -112,6 +113,32 @@ public void graphDSG_view_union_1() { assertEquals(2, g.size()); } + // ---- Graph#stream() over a dataset must match Graph#find() (GraphView.stream override). + + @Test + public void graphDSG_stream_dft() { + Graph g = makeDefaultGraph(baseDSG); + assertEquals(1, g.stream(null, null, null).count()); + assertEquals(SSE.parseTriple("(

0)"), g.stream().findFirst().orElse(null)); + assertEquals(Iter.toSet(g.find(null, null, null)), g.stream(null, null, null).collect(Collectors.toSet())); + } + + @Test + public void graphDSG_stream_named() { + Graph g = makeNamedGraph(baseDSG, gn1); + assertEquals(1, g.stream(null, null, null).count()); + assertEquals(Iter.toSet(g.find(null, null, null)), g.stream(null, null, null).collect(Collectors.toSet())); + } + + @Test + public void graphDSG_stream_union() { + Graph g = makeNamedGraph(baseDSG, Quad.unionGraph); + // The union view de-duplicates the triple shared by g2 and g3 -> 2 distinct triples. + assertEquals(2, g.stream(null, null, null).count()); + assertEquals(Iter.toSet(g.find(null, null, null)), g.stream(null, null, null).collect(Collectors.toSet())); + } + + // ---- prefixes @Test diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/DatasetGraphSimpleMem.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/DatasetGraphSimpleMem.java index e82c0dcca01..838a3302c22 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/DatasetGraphSimpleMem.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/DatasetGraphSimpleMem.java @@ -24,6 +24,7 @@ import static org.apache.jena.system.G.nullAsAny; import java.util.*; +import java.util.stream.Stream; import org.apache.jena.graph.Graph; import org.apache.jena.graph.Node; @@ -90,33 +91,60 @@ public boolean supportsTransactions() { return false; } - @Override - public Iterator findInDftGraph(Node s, Node p, Node o) { + private List collectInDftGraph(Node s, Node p, Node o) { List results = new ArrayList<>(); for ( Triple t : triples ) if ( matches(t, s, p, o) ) // ?? Quad.defaultGraphNodeGenerated // Quad.defaultGraphIRI results.add(Quad.create(Quad.defaultGraphIRI, t)); - return results.iterator(); + return results; } @Override - public Iterator findInSpecificNamedGraph(Node g, Node s, Node p, Node o) { + public Iterator findInDftGraph(Node s, Node p, Node o) { + return collectInDftGraph(s, p, o).iterator(); + } + + @Override + protected Stream streamInDftGraph(Node s, Node p, Node o) { + return collectInDftGraph(s, p, o).stream(); + } + + private List collectInSpecificNamedGraph(Node g, Node s, Node p, Node o) { List results = new ArrayList<>(); for ( Quad q : quads ) if ( matches(q, g, s, p, o) ) results.add(q); - return results.iterator(); + return results; } @Override - public Iterator findInAnyNamedGraphs(Node s, Node p, Node o) { + public Iterator findInSpecificNamedGraph(Node g, Node s, Node p, Node o) { + return collectInSpecificNamedGraph(g, s, p, o).iterator(); + } + + @Override + protected Stream streamInSpecificNamedGraph(Node g, Node s, Node p, Node o) { + return collectInSpecificNamedGraph(g, s, p, o).stream(); + } + + private List collectInAnyNamedGraphs(Node s, Node p, Node o) { List results = new ArrayList<>(); for ( Quad q : quads ) if ( matches(q, Node.ANY, s, p, o) ) results.add(q); - return results.iterator(); + return results; + } + + @Override + public Iterator findInAnyNamedGraphs(Node s, Node p, Node o) { + return collectInAnyNamedGraphs(s, p, o).iterator(); + } + + @Override + protected Stream streamInAnyNamedGraphs(Node s, Node p, Node o) { + return collectInAnyNamedGraphs(s, p, o).stream(); } private boolean matches(Triple t, Node s, Node p, Node o) { diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/TestDatasetGraphFilteredView.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/TestDatasetGraphFilteredView.java index 001323a48ad..6f4f980f562 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/TestDatasetGraphFilteredView.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/TestDatasetGraphFilteredView.java @@ -22,12 +22,14 @@ package org.apache.jena.sparql.core; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Set; import java.util.function.Predicate; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; @@ -178,6 +180,36 @@ void setup() { }); } + // stream() must apply the same filter as find(). DatasetGraphWrapper.stream(...) forwards to + // the wrapped dataset, so these guard against the filter being bypassed for stream access. + + @Test public void filtered_stream_2() { + Predicate filter = x->x.getGraph().equals(g2); + Txn.executeRead(basedsg(),()->{ + DatasetGraph dsg = new DatasetGraphFilteredView(basedsg(),filter, Collections.singleton(g1)); + assertEquals(2, dsg.stream(null, null, null, null).count()); + assertEquals(2, dsg.stream(g2, null, null, null).count()); + assertEquals(2, dsg.stream(null, s2, null, null).count()); + assertEquals(0, dsg.stream(g1, null, null, null).count()); + // No leakage: stream() returns exactly the filtered find() result. + assertEquals(Iter.toSet(dsg.find()), dsg.stream().collect(Collectors.toSet())); + }); + } + + @Test public void filtered_stream_matches_find() { + Predicate filter = x-> x.getSubject().equals(s2) || x.getSubject().equals(s1); + Txn.executeRead(basedsg(),()->{ + DatasetGraph dsg = new DatasetGraphFilteredView(basedsg(),filter, Arrays.asList(g1, g2)); + // Sanity: the unfiltered base really does contain more quads than the filtered view. + assertEquals(3, dsg.stream().count()); + assertTrue(Iter.count(basedsg().find()) > 3); + // stream() and find() agree under filtering, for the whole dataset and per-pattern. + assertEquals(Iter.toSet(dsg.find()), dsg.stream().collect(Collectors.toSet())); + assertEquals(Iter.toSet(dsg.find(g2, null, null, null)), + dsg.stream(g2, null, null, null).collect(Collectors.toSet())); + }); + } + private void assertSame(DatasetGraph dsg1, DatasetGraph dsg2) { Set quads1 = Iter.toSet(dsg1.find()); Set quads2 = Iter.toSet(dsg2.find()); diff --git a/jena-arq/src/test/java/org/apache/jena/sparql/core/TestSpecialDatasets.java b/jena-arq/src/test/java/org/apache/jena/sparql/core/TestSpecialDatasets.java index 0ed2d09caef..dbc4aa359b6 100644 --- a/jena-arq/src/test/java/org/apache/jena/sparql/core/TestSpecialDatasets.java +++ b/jena-arq/src/test/java/org/apache/jena/sparql/core/TestSpecialDatasets.java @@ -23,6 +23,7 @@ import static org.junit.jupiter.api.Assertions.*; +import org.apache.jena.atlas.iterator.Iter; import org.junit.jupiter.api.Test; import org.apache.jena.graph.Graph; @@ -38,6 +39,8 @@ import org.apache.jena.sparql.sse.SSE; import org.apache.jena.system.Txn; +import java.util.stream.Collectors; + /** Tests for * {@link DatasetGraphZero}, * {@link DatasetGraphSink} and @@ -286,4 +289,47 @@ public void sink_graph_txn_5() { TransactionHandler h = g.getTransactionHandler(); assertThrows(JenaException.class,()-> h.commit() ); } + + // -- one : DatasetGraphOne wraps a single graph as the default graph. + // Exercises the direct stream(g,s,p,o) override. + + private static Node o2 = SSE.parseNode(":o2"); + + private static DatasetGraph oneWithData() { + Graph g = GraphFactory.createGraphMem(); + g.add(triple); // (:s :p :o) + g.add(SSE.parseTriple("(:s :p :o2)")); + return DatasetGraphFactory.wrap(g); + } + + @Test public void one_stream_all() { + DatasetGraph dsg = oneWithData(); + assertEquals(2, dsg.stream().count()); + assertEquals(Iter.toSet(dsg.find()), dsg.stream().collect(Collectors.toSet())); + } + + @Test public void one_stream_dft() { + DatasetGraph dsg = oneWithData(); + assertEquals(2, dsg.stream(Quad.defaultGraphIRI, null, null, null).count()); + assertEquals(1, dsg.stream(Quad.defaultGraphIRI, null, null, o2).count()); + } + + @Test public void one_stream_named_empty() { + DatasetGraph dsg = oneWithData(); + // DatasetGraphOne has no named graphs: a specific named graph streams nothing. + assertEquals(0, dsg.stream(gn, null, null, null).count()); + } + + @Test public void one_stream_matches_find() { + DatasetGraph dsg = oneWithData(); + Node[][] patterns = { + { null, null, null, null }, + { Quad.defaultGraphIRI, null, null, null }, + { gn, null, null, null }, + { null, null, null, o2 }, + }; + for ( Node[] p : patterns ) + assertEquals(Iter.toSet(dsg.find(p[0], p[1], p[2], p[3])), + dsg.stream(p[0], p[1], p[2], p[3]).collect(Collectors.toSet())); + } } diff --git a/jena-arq/src/test/java/org/apache/jena/system/TestG_Quad.java b/jena-arq/src/test/java/org/apache/jena/system/TestG_Quad.java index f68a1935a88..64f222879da 100644 --- a/jena-arq/src/test/java/org/apache/jena/system/TestG_Quad.java +++ b/jena-arq/src/test/java/org/apache/jena/system/TestG_Quad.java @@ -34,6 +34,8 @@ import org.apache.jena.sparql.sse.SSE; import org.apache.jena.sys.JenaSystem; +import java.util.stream.Stream; + public class TestG_Quad { static { JenaSystem.init(); } @@ -69,4 +71,17 @@ private static DatasetGraph dataset(String trigBody) { String setup = "PREFIX : \n"; return RDFParser.fromString(setup+trigBody, Lang.TRIG).toDatasetGraph(); } + + @Test + public void triples2quads() { + var quad = SSE.parseQuad("(:g :s :p :o)"); + var q = G.triples2quads(quad.getGraph(), Stream.of(quad.asTriple())).findFirst().orElseThrow(); + assertEquals(quad, q); + } + + @Test void triples2quadsDftGraph() { + var triple = SSE.parseTriple("(:s :p :o)"); + var quad = G.triples2quadsDftGraph(Stream.of(triple)).findFirst().orElseThrow(); + assertEquals(Quad.create(Quad.defaultGraphIRI, triple), quad); + } } diff --git a/jena-arq/src/test/java/org/apache/jena/system/TestG_Triple.java b/jena-arq/src/test/java/org/apache/jena/system/TestG_Triple.java index f3a44b1114d..89ef811fb43 100644 --- a/jena-arq/src/test/java/org/apache/jena/system/TestG_Triple.java +++ b/jena-arq/src/test/java/org/apache/jena/system/TestG_Triple.java @@ -35,6 +35,8 @@ import org.apache.jena.sparql.sse.SSE; import org.apache.jena.sys.JenaSystem; +import java.util.stream.Stream; + public class TestG_Triple { static { JenaSystem.init(); } @@ -100,4 +102,11 @@ private static Graph graph(String ttlBody) { String setup = "PREFIX : \n"; return RDFParser.fromString(setup+ttlBody, Lang.TURTLE).toGraph(); } + + @Test + public void quads2triples() { + var quad = SSE.parseQuad("(:g :s :p :o)"); + var triple = G.quads2triples(Stream.of(quad)).findFirst().orElseThrow(); + assertEquals(SSE.parseTriple("(:s :p :o)"), triple); + } } diff --git a/jena-arq/src/test/java/org/apache/jena/system/buffering/TestBufferingDatasetGraph.java b/jena-arq/src/test/java/org/apache/jena/system/buffering/TestBufferingDatasetGraph.java index 407822290a4..97a7ce55778 100644 --- a/jena-arq/src/test/java/org/apache/jena/system/buffering/TestBufferingDatasetGraph.java +++ b/jena-arq/src/test/java/org/apache/jena/system/buffering/TestBufferingDatasetGraph.java @@ -21,14 +21,15 @@ package org.apache.jena.system.buffering; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.junit.jupiter.api.Assertions.assertThrows; - +import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.jena.atlas.iterator.Iter; +import org.apache.jena.graph.Node; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedClass; import org.junit.jupiter.params.provider.Arguments; @@ -41,6 +42,8 @@ import org.apache.jena.sparql.core.Quad; import org.apache.jena.sparql.sse.SSE; +import static org.junit.jupiter.api.Assertions.*; + @ParameterizedClass @MethodSource("provideArgs") @@ -190,4 +193,59 @@ public void buffered_5() { buffered.reset(); assertTrue(buffered.isEmpty()); } + + // ---- stream() must agree with find(), including for named graphs. + + @Test public void stream_buffered_named_graph() { + Quad q = SSE.parseQuad("(:g :s :p :o)"); // a buffered, un-flushed named-graph quad + buffered.execute(() -> { + buffered.add(q); + Node g = q.getGraph(); + assertEquals(1L, buffered.stream(g, Node.ANY, Node.ANY, Node.ANY).count()); // specific named graph + assertEquals(1L, buffered.stream().count()); // all quads + assertEquals(1L, buffered.stream(Quad.unionGraph, Node.ANY, Node.ANY, Node.ANY).count()); // union graph + // find(unionGraph, ...) shares the same primitive: guard against the iterator regression. + assertEquals(1L, Iter.count(buffered.find(Quad.unionGraph, Node.ANY, Node.ANY, Node.ANY))); + // Whole dataset: stream() returns exactly what find() returns. + assertEquals(Iter.toSet(buffered.find()), buffered.stream().collect(Collectors.toSet())); + }); + } + + @Test public void stream_matches_find_overlay() { + Node g1 = SSE.parseNode(":g1"); + Node g2 = SSE.parseNode(":g2"); + Node s = SSE.parseNode(":s"); + Quad baseG1 = SSE.parseQuad("(:g1 :s :p 1)"); + Quad baseG2 = SSE.parseQuad("(:g2 :s :p 2)"); // deleted in the buffer + Quad baseDft = Quad.create(Quad.defaultGraphIRI, SSE.parseTriple("(:s :p 3)")); + Quad addG1 = SSE.parseQuad("(:g1 :s :p 4)"); // buffered add (named) + Quad addDft = Quad.create(Quad.defaultGraphIRI, SSE.parseTriple("(:s :p 5)")); // buffered add (default) + + buffered.executeWrite(() -> { + base.add(baseG1); base.add(baseG2); base.add(baseDft); + buffered.add(addG1); buffered.add(addDft); buffered.delete(baseG2); + + // The buffered overlay (base + added - deleted) must be identical via stream() and find(). + Node[][] patterns = { + { Node.ANY, Node.ANY, Node.ANY, Node.ANY }, // everything + { Quad.defaultGraphIRI, Node.ANY, Node.ANY, Node.ANY }, // default graph + { g1, Node.ANY, Node.ANY, Node.ANY }, // specific named graph (base + added) + { g2, Node.ANY, Node.ANY, Node.ANY }, // named graph emptied by a buffered delete + { Quad.unionGraph, Node.ANY, Node.ANY, Node.ANY }, // union of named graphs (distinct triples) + { Node.ANY, s, Node.ANY, Node.ANY }, // pattern spanning graphs + }; + for ( Node[] pat : patterns ) { + Set viaFind = Iter.toSet(buffered.find(pat[0], pat[1], pat[2], pat[3])); + Set viaStream = buffered.stream(pat[0], pat[1], pat[2], pat[3]).collect(Collectors.toSet()); + assertEquals(viaFind, viaStream, () -> "stream() != find() for " + Arrays.toString(pat)); + } + + // Concrete overlay expectations. (Union parity is covered by the loop above; its exact + // contents are left to find()/stream() agreement, since BufferingDatasetGraph's + // "any named graph" view also surfaces the default graph - pre-existing, out of scope here.) + assertEquals(2L, buffered.stream(g1, Node.ANY, Node.ANY, Node.ANY).count()); // base + buffered add + assertEquals(0L, buffered.stream(g2, Node.ANY, Node.ANY, Node.ANY).count()); // fully deleted + assertEquals(2L, buffered.stream(Quad.defaultGraphIRI, Node.ANY, Node.ANY, Node.ANY).count()); // base + buffered add + }); + } } diff --git a/jena-db/jena-dboe-storage/src/main/java/org/apache/jena/dboe/storage/system/DatasetGraphStorage.java b/jena-db/jena-dboe-storage/src/main/java/org/apache/jena/dboe/storage/system/DatasetGraphStorage.java index bdacc7b08ac..ecb72e20b1e 100644 --- a/jena-db/jena-dboe-storage/src/main/java/org/apache/jena/dboe/storage/system/DatasetGraphStorage.java +++ b/jena-db/jena-dboe-storage/src/main/java/org/apache/jena/dboe/storage/system/DatasetGraphStorage.java @@ -22,6 +22,7 @@ package org.apache.jena.dboe.storage.system; import java.util.Iterator; +import java.util.stream.Stream; import org.apache.jena.atlas.iterator.Iter; import org.apache.jena.dboe.storage.DatabaseRDF; @@ -45,6 +46,7 @@ import org.apache.jena.sparql.core.DatasetGraphTriplesQuads; import org.apache.jena.sparql.core.Quad; import org.apache.jena.sparql.core.Transactional; +import org.apache.jena.system.G; /** Alternative: DatasetGraph over RDFStorage, using DatasetGraphBaseFind * Collapses DatasetGraphTriplesQuads into this adapter class. @@ -131,7 +133,12 @@ private Iterator findStorage(Node g, Node s, Node p, Node o) { @Override protected Iterator findInDftGraph(Node s, Node p, Node o) { - return Iter.map(findStorage(s, p, o), t -> Quad.create(Quad.defaultGraphIRI, t)); + return G.triples2quadsDftGraph(findStorage(s, p, o)); + } + + @Override + protected Stream streamInDftGraph(Node s, Node p, Node o) { + return Iter.asStream(findInDftGraph(s, p, o)); } @Override @@ -139,12 +146,22 @@ protected Iterator findInSpecificNamedGraph(Node g, Node s, Node p, Node o return findStorage(g, s, p, o); } + @Override + protected Stream streamInSpecificNamedGraph(Node g, Node s, Node p, Node o) { + return Iter.asStream(findInSpecificNamedGraph(g, s, p, o)); + } + @Override protected Iterator findInAnyNamedGraphs(Node s, Node p, Node o) { // Implementations may wish to do better. return findStorage(Node.ANY, s, p, o); } + @Override + protected Stream streamInAnyNamedGraphs(Node s, Node p, Node o) { + return Iter.asStream(findInAnyNamedGraphs(s, p, o)); + } + @Override public Graph getDefaultGraph() { return GraphViewStorage.createDefaultGraphStorage(this, prefixes); diff --git a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/IteratorTxnTracker.java b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/IteratorTxnTracker.java index d04b4462750..f8db2011127 100644 --- a/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/IteratorTxnTracker.java +++ b/jena-db/jena-dboe-transaction/src/main/java/org/apache/jena/dboe/transaction/txn/IteratorTxnTracker.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.Objects; +import java.util.function.Consumer; import org.apache.jena.atlas.iterator.IteratorWrapper; @@ -43,6 +44,12 @@ public IteratorTxnTracker(Iterator iterator, TransactionalSystem txnSystem, T @Override public void remove() { check() ; super.remove() ; } + @Override + public void forEachRemaining(Consumer action) { + check() ; + super.forEachRemaining(action) ; + } + private void check() { Transaction txn = txnSystem.getThreadTransaction(); if ( txn == null ) diff --git a/jena-tdb1/src/main/java/org/apache/jena/tdb1/store/DatasetGraphTDB.java b/jena-tdb1/src/main/java/org/apache/jena/tdb1/store/DatasetGraphTDB.java index 3bd4af27f01..ffe0ce51a22 100644 --- a/jena-tdb1/src/main/java/org/apache/jena/tdb1/store/DatasetGraphTDB.java +++ b/jena-tdb1/src/main/java/org/apache/jena/tdb1/store/DatasetGraphTDB.java @@ -23,6 +23,7 @@ import java.util.Iterator ; +import java.util.stream.Stream; import org.apache.jena.atlas.iterator.Iter ; import org.apache.jena.atlas.lib.Closeable ; @@ -83,14 +84,26 @@ public DatasetGraphTDB(TripleTable tripleTable, QuadTable quadTable, DatasetPref protected Iterator findInDftGraph(Node s, Node p, Node o) { return G.triples2quadsDftGraph(getTripleTable().find(s, p, o)) ; } + @Override + protected Stream streamInDftGraph(Node s, Node p, Node o) + { return Iter.asStream(findInDftGraph(s, p, o)) ; } + @Override protected Iterator findInSpecificNamedGraph(Node g, Node s, Node p, Node o) { return getQuadTable().find(g, s, p, o) ; } + @Override + protected Stream streamInSpecificNamedGraph(Node g, Node s, Node p, Node o) + { return Iter.asStream(findInSpecificNamedGraph(g, s, p, o)) ; } + @Override protected Iterator findInAnyNamedGraphs(Node s, Node p, Node o) { return getQuadTable().find(Node.ANY, s, p, o) ; } + @Override + protected Stream streamInAnyNamedGraphs(Node s, Node p, Node o) + { return Iter.asStream(findInAnyNamedGraphs(s, p, o)) ; } + @Override protected void addToDftGraph(Node s, Node p, Node o) { getTripleTable().add(s,p,o) ; } diff --git a/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/GraphViewSwitchable.java b/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/GraphViewSwitchable.java index 22f04c2f17a..a6daa694f27 100644 --- a/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/GraphViewSwitchable.java +++ b/jena-tdb2/src/main/java/org/apache/jena/tdb2/store/GraphViewSwitchable.java @@ -28,6 +28,8 @@ import org.apache.jena.sparql.core.Quad; import org.apache.jena.util.iterator.ExtendedIterator; +import java.util.stream.Stream; + /** * A GraphView that is sensitive to {@link DatasetGraphSwitchable} switching. * This ensures that a graph object remains valid as the {@link DatasetGraphSwitchable} switches. @@ -93,6 +95,11 @@ protected ExtendedIterator graphBaseFind(Node s, Node p, Node o) { return getBaseGraph().find(s, p, o); } + @Override + public Stream stream(Node s, Node p, Node o) { + return getBaseGraph().stream(s, p, o); + } + private DatasetGraphTDB getDSG() { return ((DatasetGraphTDB)(getx().get())); }