From ce3a6a4d62d0527d8d49a366a358a4fae66c0664 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20Kie=C3=9Fling?= Date: Fri, 28 Mar 2025 17:18:09 +0100 Subject: [PATCH 1/2] Introduce explict endpoints for Wcc --- .../procedure_surface/api/__init__.py | 0 .../procedure_surface/api/wcc_endpoints.py | 242 ++++++++++++++++++ 2 files changed, 242 insertions(+) create mode 100644 graphdatascience/procedure_surface/api/__init__.py create mode 100644 graphdatascience/procedure_surface/api/wcc_endpoints.py diff --git a/graphdatascience/procedure_surface/api/__init__.py b/graphdatascience/procedure_surface/api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/graphdatascience/procedure_surface/api/wcc_endpoints.py b/graphdatascience/procedure_surface/api/wcc_endpoints.py new file mode 100644 index 000000000..f6bec4679 --- /dev/null +++ b/graphdatascience/procedure_surface/api/wcc_endpoints.py @@ -0,0 +1,242 @@ +from abc import ABC, abstractmethod +from typing import Any, List, Optional + +from pandas import DataFrame, Series + +from ...graph.graph_object import Graph + + +class WccEndpoints(ABC): + """ + Abstract base class defining the API for the Weakly Connected Components (WCC) algorithm. + """ + + @abstractmethod + def mutate( + self, + G: Graph, + mutate_property: str, + threshold: Optional[float] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + seed_property: Optional[str] = None, + consecutive_ids: Optional[bool] = None, + relationship_weight_property: Optional[str] = None, + ) -> Series[Any]: + """ + Executes the WCC algorithm and writes the results to the in-memory graph as node properties. + + Parameters + ---------- + G : Graph + The graph to run the algorithm on + mutate_property : str + The property name to store the component ID for each node + threshold : Optional[float], default=None + The minimum required weight to consider a relationship during traversal + relationship_types : Optional[List[str]], default=None + The relationship types to project + node_labels : Optional[List[str]], default=None + The node labels to project + sudo : Optional[bool], default=None + Run analysis with admin permission + log_progress : Optional[bool], default=None + Whether to log progress + username : Optional[str], default=None + The username to attribute the procedure run to + concurrency : Optional[Any], default=None + The number of concurrent threads + job_id : Optional[Any], default=None + An identifier for the job + seed_property : Optional[str], default=None + Defines node properties that are used as initial component identifiers + consecutive_ids : Optional[bool], default=None + Flag to decide whether component identifiers are mapped into a consecutive id space + relationship_weight_property : Optional[str], default=None + The property name that contains weight + + Returns + ------- + Series + Algorithm metrics and statistics + """ + pass + + @abstractmethod + def stats( + self, + G: Graph, + threshold: Optional[float] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + seed_property: Optional[str] = None, + consecutive_ids: Optional[bool] = None, + relationship_weight_property: Optional[str] = None, + ) -> Series[Any]: + """ + Executes the WCC algorithm and returns statistics. + + Parameters + ---------- + G : Graph + The graph to run the algorithm on + threshold : Optional[float], default=None + The minimum required weight to consider a relationship during traversal + relationship_types : Optional[List[str]], default=None + The relationship types to project + node_labels : Optional[List[str]], default=None + The node labels to project + sudo : Optional[bool], default=None + Run analysis with admin permission + log_progress : Optional[bool], default=None + Whether to log progress + username : Optional[str], default=None + The username to attribute the procedure run to + concurrency : Optional[Any], default=None + The number of concurrent threads + job_id : Optional[Any], default=None + An identifier for the job + seed_property : Optional[str], default=None + Defines node properties that are used as initial component identifiers + consecutive_ids : Optional[bool], default=None + Flag to decide whether component identifiers are mapped into a consecutive id space + relationship_weight_property : Optional[str], default=None + The property name that contains weight + + Returns + ------- + Series + Algorithm metrics and statistics + """ + pass + + @abstractmethod + def stream( + self, + G: Graph, + min_component_size: Optional[int] = None, + threshold: Optional[float] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + seed_property: Optional[str] = None, + consecutive_ids: Optional[bool] = None, + relationship_weight_property: Optional[str] = None, + ) -> DataFrame: + """ + Executes the WCC algorithm and returns a stream of results. + + Parameters + ---------- + G : Graph + The graph to run the algorithm on + min_component_size : Optional[int], default=None + Don't stream components with fewer nodes than this + threshold : Optional[float], default=None + The minimum required weight to consider a relationship during traversal + relationship_types : Optional[List[str]], default=None + The relationship types to project + node_labels : Optional[List[str]], default=None + The node labels to project + sudo : Optional[bool], default=None + Run analysis with admin permission + log_progress : Optional[bool], default=None + Whether to log progress + username : Optional[str], default=None + The username to attribute the procedure run to + concurrency : Optional[Any], default=None + The number of concurrent threads + job_id : Optional[Any], default=None + An identifier for the job + seed_property : Optional[str], default=None + Defines node properties that are used as initial component identifiers + consecutive_ids : Optional[bool], default=None + Flag to decide whether component identifiers are mapped into a consecutive id space + relationship_weight_property : Optional[str], default=None + The property name that contains weight + + Returns + ------- + DataFrame + DataFrame with the algorithm results + """ + pass + + @abstractmethod + def write( + self, + G: Graph, + write_property: str, + min_component_size: Optional[int] = None, + threshold: Optional[float] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[Any] = None, + job_id: Optional[Any] = None, + seed_property: Optional[str] = None, + consecutive_ids: Optional[bool] = None, + relationship_weight_property: Optional[str] = None, + write_concurrency: Optional[Any] = None, + write_to_result_store: Optional[bool] = None, + ) -> Series[Any]: + """ + Executes the WCC algorithm and writes the results to the Neo4j database. + + Parameters + ---------- + G : Graph + The graph to run the algorithm on + write_property : str + The property name to write component IDs to + min_component_size : Optional[int], default=None + Don't write components with fewer nodes than this + threshold : Optional[float], default=None + The minimum required weight to consider a relationship during traversal + relationship_types : Optional[List[str]], default=None + The relationship types to project + node_labels : Optional[List[str]], default=None + The node labels to project + sudo : Optional[bool], default=None + Run analysis with admin permission + log_progress : Optional[bool], default=None + Whether to log progress + username : Optional[str], default=None + The username to attribute the procedure run to + concurrency : Optional[Any], default=None + The number of concurrent threads + job_id : Optional[Any], default=None + An identifier for the job + seed_property : Optional[str], default=None + Defines node properties that are used as initial component identifiers + consecutive_ids : Optional[bool], default=None + Flag to decide whether component identifiers are mapped into a consecutive id space + relationship_weight_property : Optional[str], default=None + The property name that contains weight + write_concurrency : Optional[Any], default=None + The number of concurrent threads during the write phase + write_to_result_store : Optional[bool], default=None + Whether to write the results to the result store + + Returns + ------- + Series + Algorithm metrics and statistics + """ + pass From 3bb24d847ad2dd65de15639dd8cdc37e6b8d6136 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Max=20Kie=C3=9Fling?= Date: Fri, 28 Mar 2025 17:18:38 +0100 Subject: [PATCH 2/2] Implement Wcc endpoints using Cypher procedures --- graphdatascience/graph_data_science.py | 9 + .../procedure_surface/__init__.py | 0 .../procedure_surface/cypher/__init__.py | 0 .../cypher/wcc_proc_runner.py | 228 ++++++++++++++++++ .../cypher/test_wcc_cypher_endpoints.py | 227 +++++++++++++++++ 5 files changed, 464 insertions(+) create mode 100644 graphdatascience/procedure_surface/__init__.py create mode 100644 graphdatascience/procedure_surface/cypher/__init__.py create mode 100644 graphdatascience/procedure_surface/cypher/wcc_proc_runner.py create mode 100644 graphdatascience/tests/unit/procedure_surface/cypher/test_wcc_cypher_endpoints.py diff --git a/graphdatascience/graph_data_science.py b/graphdatascience/graph_data_science.py index 30bc2aa77..98400cba4 100644 --- a/graphdatascience/graph_data_science.py +++ b/graphdatascience/graph_data_science.py @@ -7,6 +7,9 @@ from neo4j import Driver from pandas import DataFrame +from graphdatascience.procedure_surface.api.wcc_endpoints import WccEndpoints +from graphdatascience.procedure_surface.cypher.wcc_proc_runner import WccCypherEndpoints + from .call_builder import IndirectCallBuilder from .endpoints import AlphaEndpoints, BetaEndpoints, DirectEndpoints from .error.uncallable_namespace import UncallableNamespace @@ -106,10 +109,16 @@ def __init__( self._query_runner.set_show_progress(show_progress) super().__init__(self._query_runner, namespace="gds", server_version=self._server_version) + self._wcc_endpoints = WccCypherEndpoints(self._query_runner) + @property def graph(self) -> GraphProcRunner: return GraphProcRunner(self._query_runner, f"{self._namespace}.graph", self._server_version) + @property + def wcc(self) -> WccEndpoints: + return self._wcc_endpoints + @property def util(self) -> UtilProcRunner: return UtilProcRunner(self._query_runner, f"{self._namespace}.util", self._server_version) diff --git a/graphdatascience/procedure_surface/__init__.py b/graphdatascience/procedure_surface/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/graphdatascience/procedure_surface/cypher/__init__.py b/graphdatascience/procedure_surface/cypher/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/graphdatascience/procedure_surface/cypher/wcc_proc_runner.py b/graphdatascience/procedure_surface/cypher/wcc_proc_runner.py new file mode 100644 index 000000000..eb02ff741 --- /dev/null +++ b/graphdatascience/procedure_surface/cypher/wcc_proc_runner.py @@ -0,0 +1,228 @@ +from typing import Any, List, Optional + +from pandas import DataFrame, Series + +from ...call_parameters import CallParameters +from ...graph.graph_object import Graph +from ...query_runner.query_runner import QueryRunner +from ..api.wcc_endpoints import WccEndpoints + + +class WccCypherEndpoints(WccEndpoints): + """ + Implementation of the WCC algorithm endpoints. + This class handles the actual execution by forwarding calls to the query runner. + """ + + def __init__(self, query_runner: QueryRunner): + self._query_runner = query_runner + + def mutate( + self, + G: Graph, + mutate_property: str, + threshold: Optional[float] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[int] = None, + job_id: Optional[str] = None, + seed_property: Optional[str] = None, + consecutive_ids: Optional[bool] = None, + relationship_weight_property: Optional[str] = None, + ) -> Series[Any]: + # Build configuration dictionary from parameters + config: dict[str, Any] = { + "mutateProperty": mutate_property, + } + + # Add optional parameters + if threshold is not None: + config["threshold"] = threshold + if relationship_types is not None: + config["relationshipTypes"] = relationship_types + if node_labels is not None: + config["nodeLabels"] = node_labels + if sudo is not None: + config["sudo"] = sudo + if log_progress is not None: + config["logProgress"] = log_progress + if username is not None: + config["username"] = username + if concurrency is not None: + config["concurrency"] = concurrency + if job_id is not None: + config["jobId"] = job_id + if seed_property is not None: + config["seedProperty"] = seed_property + if consecutive_ids is not None: + config["consecutiveIds"] = consecutive_ids + if relationship_weight_property is not None: + config["relationshipWeightProperty"] = relationship_weight_property + + # Run procedure and return results + params = CallParameters(graph_name=G.name(), config=config) + params.ensure_job_id_in_config() + + return self._query_runner.call_procedure(endpoint="gds.wcc.mutate", params=params).squeeze() # type: ignore + + def stats( + self, + G: Graph, + threshold: Optional[float] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[int] = None, + job_id: Optional[str] = None, + seed_property: Optional[str] = None, + consecutive_ids: Optional[bool] = None, + relationship_weight_property: Optional[str] = None, + ) -> Series[Any]: + # Build configuration dictionary from parameters + config: dict[str, Any] = {} + + # Add optional parameters + if threshold is not None: + config["threshold"] = threshold + if relationship_types is not None: + config["relationshipTypes"] = relationship_types + if node_labels is not None: + config["nodeLabels"] = node_labels + if sudo is not None: + config["sudo"] = sudo + if log_progress is not None: + config["logProgress"] = log_progress + if username is not None: + config["username"] = username + if concurrency is not None: + config["concurrency"] = concurrency + if job_id is not None: + config["jobId"] = job_id + if seed_property is not None: + config["seedProperty"] = seed_property + if consecutive_ids is not None: + config["consecutiveIds"] = consecutive_ids + if relationship_weight_property is not None: + config["relationshipWeightProperty"] = relationship_weight_property + + # Run procedure and return results + params = CallParameters(graph_name=G.name(), config=config) + params.ensure_job_id_in_config() + + return self._query_runner.call_procedure(endpoint="gds.wcc.stats", params=params).squeeze() # type: ignore + + def stream( + self, + G: Graph, + min_component_size: Optional[int] = None, + threshold: Optional[float] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[int] = None, + job_id: Optional[str] = None, + seed_property: Optional[str] = None, + consecutive_ids: Optional[bool] = None, + relationship_weight_property: Optional[str] = None, + ) -> DataFrame: + # Build configuration dictionary from parameters + config: dict[str, Any] = {} + + # Add optional parameters + if min_component_size is not None: + config["minComponentSize"] = min_component_size + if threshold is not None: + config["threshold"] = threshold + if relationship_types is not None: + config["relationshipTypes"] = relationship_types + if node_labels is not None: + config["nodeLabels"] = node_labels + if sudo is not None: + config["sudo"] = sudo + if log_progress is not None: + config["logProgress"] = log_progress + if username is not None: + config["username"] = username + if concurrency is not None: + config["concurrency"] = concurrency + if job_id is not None: + config["jobId"] = job_id + if seed_property is not None: + config["seedProperty"] = seed_property + if consecutive_ids is not None: + config["consecutiveIds"] = consecutive_ids + if relationship_weight_property is not None: + config["relationshipWeightProperty"] = relationship_weight_property + + # Run procedure and return results + params = CallParameters(graph_name=G.name(), config=config) + params.ensure_job_id_in_config() + + return self._query_runner.call_procedure(endpoint="gds.wcc.stream", params=params) + + def write( + self, + G: Graph, + write_property: str, + min_component_size: Optional[int] = None, + threshold: Optional[float] = None, + relationship_types: Optional[List[str]] = None, + node_labels: Optional[List[str]] = None, + sudo: Optional[bool] = None, + log_progress: Optional[bool] = None, + username: Optional[str] = None, + concurrency: Optional[int] = None, + job_id: Optional[str] = None, + seed_property: Optional[str] = None, + consecutive_ids: Optional[bool] = None, + relationship_weight_property: Optional[str] = None, + write_concurrency: Optional[int] = None, + write_to_result_store: Optional[bool] = None, + ) -> Series[Any]: + # Build configuration dictionary from parameters + config: dict[str, Any] = { + "writeProperty": write_property, + } + + # Add optional parameters + if min_component_size is not None: + config["minComponentSize"] = min_component_size + if threshold is not None: + config["threshold"] = threshold + if relationship_types is not None: + config["relationshipTypes"] = relationship_types + if node_labels is not None: + config["nodeLabels"] = node_labels + if sudo is not None: + config["sudo"] = sudo + if log_progress is not None: + config["logProgress"] = log_progress + if username is not None: + config["username"] = username + if concurrency is not None: + config["concurrency"] = concurrency + if job_id is not None: + config["jobId"] = job_id + if seed_property is not None: + config["seedProperty"] = seed_property + if consecutive_ids is not None: + config["consecutiveIds"] = consecutive_ids + if relationship_weight_property is not None: + config["relationshipWeightProperty"] = relationship_weight_property + if write_concurrency is not None: + config["writeConcurrency"] = write_concurrency + if write_to_result_store is not None: + config["writeToResultStore"] = write_to_result_store + + # Run procedure and return results + params = CallParameters(graph_name=G.name(), config=config) + params.ensure_job_id_in_config() + + return self._query_runner.call_procedure(endpoint="gds.wcc.write", params=params).squeeze() # type: ignore diff --git a/graphdatascience/tests/unit/procedure_surface/cypher/test_wcc_cypher_endpoints.py b/graphdatascience/tests/unit/procedure_surface/cypher/test_wcc_cypher_endpoints.py new file mode 100644 index 000000000..a15204629 --- /dev/null +++ b/graphdatascience/tests/unit/procedure_surface/cypher/test_wcc_cypher_endpoints.py @@ -0,0 +1,227 @@ +import pytest + +from graphdatascience.graph.graph_object import Graph +from graphdatascience.procedure_surface.cypher.wcc_proc_runner import WccCypherEndpoints +from graphdatascience.server_version.server_version import ServerVersion +from graphdatascience.tests.unit.conftest import CollectingQueryRunner + + +@pytest.fixture +def query_runner() -> CollectingQueryRunner: + return CollectingQueryRunner(ServerVersion(2, 16, 0)) + + +@pytest.fixture +def wcc_endpoints(query_runner: CollectingQueryRunner) -> WccCypherEndpoints: + return WccCypherEndpoints(query_runner) + + +@pytest.fixture +def graph(query_runner: CollectingQueryRunner) -> Graph: + return Graph("test_graph", query_runner) + + +def test_mutate_basic(wcc_endpoints: WccCypherEndpoints, graph: Graph, query_runner: CollectingQueryRunner) -> None: + wcc_endpoints.mutate(graph, "componentId") + + assert len(query_runner.queries) == 1 + assert "gds.wcc.mutate" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + config = params["config"] + assert config["mutateProperty"] == "componentId" + assert "jobId" in config + + +def test_mutate_with_optional_params( + wcc_endpoints: WccCypherEndpoints, graph: Graph, query_runner: CollectingQueryRunner +) -> None: + wcc_endpoints.mutate( + graph, + "componentId", + threshold=0.5, + relationship_types=["REL"], + node_labels=["Person"], + sudo=True, + log_progress=True, + username="neo4j", + concurrency=4, + job_id="test-job", + seed_property="seed", + consecutive_ids=True, + relationship_weight_property="weight", + ) + + assert len(query_runner.queries) == 1 + assert "gds.wcc.mutate" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + assert params["config"] == { + "mutateProperty": "componentId", + "threshold": 0.5, + "relationshipTypes": ["REL"], + "nodeLabels": ["Person"], + "sudo": True, + "logProgress": True, + "username": "neo4j", + "concurrency": 4, + "jobId": "test-job", + "seedProperty": "seed", + "consecutiveIds": True, + "relationshipWeightProperty": "weight", + } + + +def test_stats_basic(wcc_endpoints: WccCypherEndpoints, graph: Graph, query_runner: CollectingQueryRunner) -> None: + wcc_endpoints.stats(graph) + + assert len(query_runner.queries) == 1 + assert "gds.wcc.stats" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + config = params["config"] + assert "jobId" in config + + +def test_stats_with_optional_params( + wcc_endpoints: WccCypherEndpoints, graph: Graph, query_runner: CollectingQueryRunner +) -> None: + wcc_endpoints.stats( + graph, + threshold=0.5, + relationship_types=["REL"], + node_labels=["Person"], + sudo=True, + log_progress=True, + username="neo4j", + concurrency=4, + job_id="test-job", + seed_property="seed", + consecutive_ids=True, + relationship_weight_property="weight", + ) + + assert len(query_runner.queries) == 1 + assert "gds.wcc.stats" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + assert params["config"] == { + "threshold": 0.5, + "relationshipTypes": ["REL"], + "nodeLabels": ["Person"], + "sudo": True, + "logProgress": True, + "username": "neo4j", + "concurrency": 4, + "jobId": "test-job", + "seedProperty": "seed", + "consecutiveIds": True, + "relationshipWeightProperty": "weight", + } + + +def test_stream_basic(wcc_endpoints: WccCypherEndpoints, graph: Graph, query_runner: CollectingQueryRunner) -> None: + wcc_endpoints.stream(graph) + + assert len(query_runner.queries) == 1 + assert "gds.wcc.stream" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + config = params["config"] + assert "jobId" in config + + +def test_stream_with_optional_params( + wcc_endpoints: WccCypherEndpoints, graph: Graph, query_runner: CollectingQueryRunner +) -> None: + wcc_endpoints.stream( + graph, + min_component_size=2, + threshold=0.5, + relationship_types=["REL"], + node_labels=["Person"], + sudo=True, + log_progress=True, + username="neo4j", + concurrency=4, + job_id="test-job", + seed_property="seed", + consecutive_ids=True, + relationship_weight_property="weight", + ) + + assert len(query_runner.queries) == 1 + assert "gds.wcc.stream" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + assert params["config"] == { + "minComponentSize": 2, + "threshold": 0.5, + "relationshipTypes": ["REL"], + "nodeLabels": ["Person"], + "sudo": True, + "logProgress": True, + "username": "neo4j", + "concurrency": 4, + "jobId": "test-job", + "seedProperty": "seed", + "consecutiveIds": True, + "relationshipWeightProperty": "weight", + } + + +def test_write_basic(wcc_endpoints: WccCypherEndpoints, graph: Graph, query_runner: CollectingQueryRunner) -> None: + wcc_endpoints.write(graph, "componentId") + + assert len(query_runner.queries) == 1 + assert "gds.wcc.write" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + config = params["config"] + assert config["writeProperty"] == "componentId" + assert "jobId" in config + + +def test_write_with_optional_params( + wcc_endpoints: WccCypherEndpoints, graph: Graph, query_runner: CollectingQueryRunner +) -> None: + wcc_endpoints.write( + graph, + "componentId", + min_component_size=2, + threshold=0.5, + relationship_types=["REL"], + node_labels=["Person"], + sudo=True, + log_progress=True, + username="neo4j", + concurrency=4, + job_id="test-job", + seed_property="seed", + consecutive_ids=True, + relationship_weight_property="weight", + write_concurrency=4, + write_to_result_store=True, + ) + + assert len(query_runner.queries) == 1 + assert "gds.wcc.write" in query_runner.queries[0] + params = query_runner.params[0] + assert params["graph_name"] == "test_graph" + assert params["config"] == { + "writeProperty": "componentId", + "minComponentSize": 2, + "threshold": 0.5, + "relationshipTypes": ["REL"], + "nodeLabels": ["Person"], + "sudo": True, + "logProgress": True, + "username": "neo4j", + "concurrency": 4, + "jobId": "test-job", + "seedProperty": "seed", + "consecutiveIds": True, + "relationshipWeightProperty": "weight", + "writeConcurrency": 4, + "writeToResultStore": True, + }