Compare commits

...

4 commits

2 changed files with 97 additions and 15 deletions

View file

@ -31,6 +31,7 @@ services:
environment:
# neo4j isn't exposed to the internet so having the password checked into version control doesn't matter
- NEO4J_AUTH=neo4j/your_password
- NEO4J_PLUGINS=["graph-data-science"]
ports:
# useful for dev
- "127.0.0.1:7474:7474"

View file

@ -20,6 +20,13 @@ interface GTFSStop {
stop_url?: string;
[key: string]: string | undefined; // Allow for additional fields
}
interface GTFSStopTime {
trip_id: string;
arrival_time: string;
departure_time: string;
stop_id: string;
stop_sequence: number;
}
/**
* Sets up a Neo4j graph database with location nodes from JSON and GTFS stop data
@ -31,19 +38,24 @@ export async function graph_setup(
driver: Neo4j.Driver,
OCCT_stops_json: string = "./data/OCCT/stops.json",
BCT_GTFS_stops_txt: string = "./data/BCT/stops.txt",
BCT_GTFS_stops_times_txt: string = "./data/BCT/stop_times.txt",
): Promise<void> {
const session = driver.session();
const jsonData = await Deno.readTextFile(OCCT_stops_json);
const locationNodes: LocationNode[] = JSON.parse(jsonData);
await stops_json_node_import(session, locationNodes);
await stops_json_node_import(session, locationNodes, { provider: "OCCT" });
const BCTStopsData = await Deno.readTextFile(BCT_GTFS_stops_txt);
const BCT_stops = await parse_gtfs_stops(BCTStopsData);
await stops_gtfs_node_import(session, BCT_stops);
await stops_gtfs_node_import(session, BCT_stops, { provider: "BCT" });
const BCTStopTimesData = await Deno.readTextFile(BCT_GTFS_stops_times_txt);
const BCT_stop_times = await parse_gtfs_stop_times(BCTStopTimesData);
await gtfs_edge_import(session, BCT_stop_times);
await session.close();
}
@ -51,7 +63,9 @@ export async function graph_setup(
async function stops_json_node_import(
session: Neo4j.Session,
stops: LocationNode[],
options: { provider: string },
) {
const { provider } = options;
for (const node of stops) {
await session.run(
`
@ -60,18 +74,19 @@ async function stops_json_node_import(
n.originalId = $originalId,
n.latitude = $lat,
n.longitude = $lng,
n.source = 'OCCT'
n.source = $provider
ON MATCH SET
n.originalId = $originalId,
n.latitude = $lat,
n.longitude = $lng,
n.source = 'OCCT'
n.source = $provider
`,
{
id: `OCCT_${node.id}`,
id: `${provider}_${node.id}`,
originalId: node.id,
lat: node.lat,
lng: node.lng,
provider,
},
);
}
@ -80,6 +95,8 @@ async function stops_json_node_import(
async function stops_gtfs_node_import(
session: Neo4j.Session,
stops: GTFSStop[],
// default provider is to avoid breaking upstream
options: { provider: string },
) {
// Add GTFS stops to Neo4j
for (const stop of stops) {
@ -89,7 +106,7 @@ async function stops_gtfs_node_import(
) {
continue;
}
const { provider } = options;
// Use MERGE to update existing nodes or create new ones
await session.run(
`
@ -98,23 +115,20 @@ async function stops_gtfs_node_import(
s.name = $name,
s.latitude = $lat,
s.longitude = $lng,
s.locationType = $locationType,
s.parentStation = $parentStation,
s.zoneId = $zoneId,
s.url = $url,
s.source = 'BCT'
s.originalId = $originalId,
s.source = $provider
ON MATCH SET
s.name = $name,
s.latitude = $lat,
s.longitude = $lng,
s.locationType = $locationType,
s.parentStation = $parentStation,
s.zoneId = $zoneId,
s.url = $url,
s.source = 'BCT'
s.originalId = $originalId,
s.source = $provider
`,
{
id: "BCT_" + stop.stop_id,
id: `${provider}_` + stop.stop_id,
originalId: stop.stop_id,
name: stop.stop_name,
lat: parseFloat(stop.stop_lat),
lng: parseFloat(stop.stop_lon),
@ -122,6 +136,7 @@ async function stops_gtfs_node_import(
parentStation: stop.parent_station || null,
zoneId: stop.zone_id || null,
url: stop.stop_url || null,
provider,
},
);
}
@ -168,3 +183,69 @@ async function parse_gtfs_stops(txt: string): Promise<GTFSStop[]> {
return validStops;
}
async function parse_gtfs_stop_times(
fileContents: string,
): Promise<GTFSStopTime[]> {
const records = await parseCSV(fileContents, {
skipFirstRow: true,
columns: true,
});
const stopTimes: GTFSStopTime[] = [];
for (const row of records as Record<string, string>[]) {
stopTimes.push({
trip_id: row.trip_id,
arrival_time: row.arrival_time,
departure_time: row.departure_time,
stop_id: row.stop_id,
stop_sequence: parseInt(row.stop_sequence, 10),
});
}
return stopTimes;
}
async function gtfs_edge_import(
session: neo4j.Session,
stop_times: GTFSStopTime[],
): Promise<void> {
const groupedByTrip = new Map<string, GTFSStopTime[]>();
// Group by trip_id
for (const stopTime of stop_times) {
if (!groupedByTrip.has(stopTime.trip_id)) {
groupedByTrip.set(stopTime.trip_id, []);
}
groupedByTrip.get(stopTime.trip_id)!.push(stopTime);
}
for (const [tripId, stops] of groupedByTrip.entries()) {
// Sort by stop_sequence
stops.sort((a, b) => a.stop_sequence - b.stop_sequence);
for (let i = 0; i < stops.length - 1; i++) {
const from = stops[i];
const to = stops[i + 1];
await session.run(
`
MATCH (from:TransportNode {originalId: $fromId}), (to:TransportNode {originalId: $toId})
MERGE (from)-[:DEPARTS_TO {
tripId: $tripId,
departureTime: $departureTime,
arrivalTime: $arrivalTime
}]->(to)
`,
{
fromId: from.stop_id,
toId: to.stop_id,
tripId,
departureTime: from.departure_time,
arrivalTime: to.arrival_time,
},
);
}
}
}