Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
151 views
in Technique[技术] by (71.8m points)

java - 2 consecutive stream-stream inner joins produce wrong results: what does KStream join between streams really do internally?

The problem setting

I have a stream of nodes and a stream of edges that represent consecutive updates of a graph and I want to build patterns composed of nodes and edges using multiple joins in series. Let's suppose I want to match a pattern like: (node1) --[edge1]--> (node2).
My idea is to join the stream of nodes with the stream of edges in order to compose a stream of sub-patterns of type (node1) --[edge1]-->. Then take the resulting stream and join it with the stream of nodes another time in order to compose the final pattern (node1) --[edge1]--> (node2). Filterings on the particular type of nodes and edges are not important.

Data model

So I have nodes, edges and patterns structured in Avro format:

{
  "namespace": "DataModel",
  "type": "record",
  "name": "Node",
  "doc": "Node schema, it contains a nodeID label and properties",
  "fields": [
    {
      "name": "nodeID",
      "type": "long"
    },
    {
      "name": "labels",
      "type": {
        "type": "array",
        "items": "string",
        "avro.java.string": "String"
      }
    },
    {
      "name": "properties",
      "type": {
        "type": "map",
        "values": "string",
        "avro.java.string": "String"
      }
    },
    {
      "name": "timestamp",
      "type": "long"
    }
  ]
}

{
  "namespace": "DataModel",
  "type": "record",
  "name": "Edge",
  "doc": "contains edgeID, a type, a list of properties, a starting node ID and an ending node ID ",
  "fields": [
    {
      "name": "edgeID",
      "type": "long"
    },
    {
      "name": "type",
      "type": "string"
    },
    {
      "name": "properties",
      "type": {
        "type": "map",
        "values": "string",
        "avro.java.string": "String"
      }
    },
    {
      "name": "startID",
      "type": "long"
    },
    {
      "name": "endID",
      "type": "long"
    },
    {
      "name": "timestamp",
      "type": "long"
    }
  ]
}

{
    "namespace": "DataModel",
    "type": "record",
    "name": "Pattern",
    "fields": [
      {
        "name": "first",
        "type": "long"
      },
      {
        "name": "nextJoinID",
        "type": [
          "null",
          "long"
        ],
        "default": null
      },
      {
        "name": "timestamp",
        "type": "long"
      },
      {
        "name": "segments",
        "doc": "It's the ordered list of nodes and edges that compose this sub-pattern from the leftmost node to the rightmost edge or node",
        "type": {
          "type": "array",
          "items": [
            "DataModel.Node",
            "DataModel.Edge"
          ]
        }
      }

Then I have the following two ValueJoiners:

The first one to be used for an inner join of a nodes stream and an edges stream.
The second one to be used for an inner join of a supatterns stream and node stream.

public class NodeEdgeJoiner implements ValueJoiner<Node, Edge, Pattern> {

    @Override
    public Pattern apply(Node node, Edge edge) {

        Object[] segments = {node,edge};
        return Pattern.newBuilder()
                .setFirst(node.getNodeID())
                .setNextJoinID(edge.getEndID())
                .setSegments(Arrays.asList(segments))
                .setTimestamp(Math.min(node.getTimestamp(),edge.getTimestamp()))
                .build();

    }
}
public class PatternNodeJoiner implements ValueJoiner<Pattern, Node, Pattern> {

    @Override
    public Pattern apply(Pattern pattern, Node node) {

        List<Object> segments = pattern.getSegments();
        segments.add(node);
        return Pattern.newBuilder()
                .setFirst(pattern.getFirst())
                .setNextJoinID(node.getNodeID())
                .setSegments(segments)
                .setTimestamp(Math.min(node.getTimestamp(),pattern.getTimestamp()))
                .build();
    }
}

My intention is to catch patterns like : (nodeId == 1)--[label == "related_to"]-->() where

  • (nodeId == 1) represents a node with id=1
  • --[label == "related_to"]--> represents a directed edge with label = "related_to"
  • () represents a generic node.

The idea for concatenating those pieces together is to perform two consecutive joins using the previous Valuejoiners. I want you to focus to the first operation performed by both the ValueJoiners: in order to build the pattern I just simply append nodes and edges at the end of a list that is part of the Avro schema of a Pattern. The following is the generic loop to produce nodes and edges and publish them in the corresponding topics. The key of each node record corresponds to the nodeID and the key of each edge record is the nodeID of the incoming node of the edge.

while(true){

            try (final KafkaProducer<Long, Node> nodeKafkaProducer = new KafkaProducer<Long, Node>(props)) {

                final KafkaProducer<Long, Edge> edgeKafkaProducer = new KafkaProducer<Long, Edge>(props);

                nodeKafkaProducer.send(new ProducerRecord<Long, Node>(nodeTopic, (long) 1,
                        buildNodeRecord(1, Collections.singletonList("aString"), "aString",
                                System.currentTimeMillis())));
                edgeKafkaProducer.send(new ProducerRecord<Long, Edge>(edgesTopic, (long) 1,
                        buildEdgeRecord(1, 1, 4, "related_to", "aString",
                                System.currentTimeMillis())));
                Thread.sleep(9000);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

where:

private Node buildNodeRecord(long nodeId, List<String> labelsToSet, String property, long timestamp){

        Node record = new Node();
        record.setNodeID(nodeId);
        record.setLabels(labelsToSet);
        Map<String, String> propMap = new HashMap<String, String>();
        propMap.put("property", property);
        record.setProperties(propMap);
        record.setTimestamp(timestamp);
        return record;


    }  
private Edge buildEdgeRecord(long edgeId,long startID, long endID, String type, String property, long timestamp) {

        Edge record = new Edge();
        record.setEdgeID(edgeId);
        record.setStartID(startID);
        record.setEndID(endID);
        record.setType(type);

        Map<String,String> propMap = new HashMap<String, String>();
        propMap.put("property",property);
        record.setProperties(propMap);
        record.setTimestamp(timestamp);

        return record;
    }

The following part of the code describes the pipeline.

//configuration of specific avro serde for pattern type
        final SpecificAvroSerde<Pattern> patternSpecificAvroSerde = new SpecificAvroSerde<>();
        final Map<String, String> serdeConfig = Collections.singletonMap(
                AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, envProps.getProperty("schema.registry.url"));
        patternSpecificAvroSerde.configure(serdeConfig,false);

        //the valueJoiners we need
        final NodeEdgeJoiner nodeEdgeJoiner = new NodeEdgeJoiner();
        final PatternNodeJoiner patternNodeJoiner = new PatternNodeJoiner();

        //timestampExtractors
        NodeTimestampExtractor nodeTimestampExtractor = new NodeTimestampExtractor();
        SubPatternTimeStampExtractor subPatternTimeStampExtractor = new SubPatternTimeStampExtractor();
        EdgeTimestampExtractor edgeTimestampExtractor = new EdgeTimestampExtractor();

        //node source
        final KStream<Long, Node> nodeKStream = builder.stream(envProps.getProperty("node.topic.name"),
                Consumed.with(nodeTimestampExtractor));

       //filter on nodes topic
        nodeKStream.filter((key, value) -> value.getNodeID()==1).to(envProps.getProperty("firstnodes.topic.name"));
        final KStream<Long,Node> firstFilteredNodes = builder.stream(envProps.getProperty("firstnodes.topic.name"),
                Consumed.with(nodeTimestampExtractor));

        //edges keyed by incoming node
        final KStream<Long,Edge> edgeKstream = builder.stream(envProps.getProperty("edge.topic.name"),
                Consumed.with(edgeTimestampExtractor));

        //filter operation on edges for the first part of the pattern
        final KStream<Long,Edge> firstEdgeFiltered = edgeKstream.filter((key, value) ->
                value.getType().equals("related_to"));

        //first join
        firstFilteredNodes.join(firstEdgeFiltered,nodeEdgeSubJoiner,
                JoinWindows.of(Duration.ofSeconds(10)))
                .map((key, value) -> new KeyValue<Long, SubPattern>(value.getNextJoinID(), value))
                .to(envProps.getProperty("firstJoin.topic.name"));

        final KStream <Long,SubPattern> mappedFirstJoin = builder.stream(envProps.getProperty("firstJoin.topic.name"),
                Consumed.with(subPatternTimeStampExtractor));

        //second join
        KStream <Long,Pattern> secondJoin = mappedFirstJoin
                .join(nodeKStream,subPatternNodeJoiner, JoinWindows.of(Duration.ofSeconds(10)));
        secondJoin.print(Printed.toSysOut()); // should print out final records

I'm not going to show timestampextractors since I think they are irrelevant to the point.

The Issue

So I expect the output to be a stream of pattern records and the list ("segments" from the Avro schema) of each Pattern to be the same size: 1 node 1 edge and another node. But this doesn't happen. Instead I get this output:

[KSTREAM-MERGE-0000000018]: 4, {"first": 1, "nextJoinID": 4, "timestamp": 1611252427338, "segments": [{"nodeID": 1, "labels": ["aString"], "properties": {"property": "aString"}, "timestamp": 1611252427338}, {"edgeID&

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

In your first ValueJoiner you create a new new object:

Object[] segments = {node,edge};

In you second ValueJoiner you are getting a list and adding to it. You would need to deep-copy the list though:

// your code
List<Object> segments = pattern.getSegments();
segments.add(node); // this effectively modifies the input object;
                    // if this input object joins multiple times,
                    // you may introduce an undesired side effect

// instead you should do
List<Object> segments = new LinkedList<>(pattern.getSegments());
segments.add(node);

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...