Compare commits
4 commits
c39cbab3dd
...
390e2ae57d
Author | SHA1 | Date | |
---|---|---|---|
|
390e2ae57d | ||
|
c6f5e8245a | ||
|
a7a7abd901 | ||
|
9118e59b3a |
2 changed files with 97 additions and 15 deletions
|
@ -31,6 +31,7 @@ services:
|
||||||
environment:
|
environment:
|
||||||
# neo4j isn't exposed to the internet so having the password checked into version control doesn't matter
|
# neo4j isn't exposed to the internet so having the password checked into version control doesn't matter
|
||||||
- NEO4J_AUTH=neo4j/your_password
|
- NEO4J_AUTH=neo4j/your_password
|
||||||
|
- NEO4J_PLUGINS=["graph-data-science"]
|
||||||
ports:
|
ports:
|
||||||
# useful for dev
|
# useful for dev
|
||||||
- "127.0.0.1:7474:7474"
|
- "127.0.0.1:7474:7474"
|
||||||
|
|
|
@ -20,6 +20,13 @@ interface GTFSStop {
|
||||||
stop_url?: string;
|
stop_url?: string;
|
||||||
[key: string]: string | undefined; // Allow for additional fields
|
[key: string]: string | undefined; // Allow for additional fields
|
||||||
}
|
}
|
||||||
|
interface GTFSStopTime {
|
||||||
|
trip_id: string;
|
||||||
|
arrival_time: string;
|
||||||
|
departure_time: string;
|
||||||
|
stop_id: string;
|
||||||
|
stop_sequence: number;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets up a Neo4j graph database with location nodes from JSON and GTFS stop data
|
* Sets up a Neo4j graph database with location nodes from JSON and GTFS stop data
|
||||||
|
@ -31,19 +38,24 @@ export async function graph_setup(
|
||||||
driver: Neo4j.Driver,
|
driver: Neo4j.Driver,
|
||||||
OCCT_stops_json: string = "./data/OCCT/stops.json",
|
OCCT_stops_json: string = "./data/OCCT/stops.json",
|
||||||
BCT_GTFS_stops_txt: string = "./data/BCT/stops.txt",
|
BCT_GTFS_stops_txt: string = "./data/BCT/stops.txt",
|
||||||
|
BCT_GTFS_stops_times_txt: string = "./data/BCT/stop_times.txt",
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const session = driver.session();
|
const session = driver.session();
|
||||||
|
|
||||||
const jsonData = await Deno.readTextFile(OCCT_stops_json);
|
const jsonData = await Deno.readTextFile(OCCT_stops_json);
|
||||||
const locationNodes: LocationNode[] = JSON.parse(jsonData);
|
const locationNodes: LocationNode[] = JSON.parse(jsonData);
|
||||||
|
|
||||||
await stops_json_node_import(session, locationNodes);
|
await stops_json_node_import(session, locationNodes, { provider: "OCCT" });
|
||||||
|
|
||||||
const BCTStopsData = await Deno.readTextFile(BCT_GTFS_stops_txt);
|
const BCTStopsData = await Deno.readTextFile(BCT_GTFS_stops_txt);
|
||||||
|
|
||||||
const BCT_stops = await parse_gtfs_stops(BCTStopsData);
|
const BCT_stops = await parse_gtfs_stops(BCTStopsData);
|
||||||
|
|
||||||
await stops_gtfs_node_import(session, BCT_stops);
|
await stops_gtfs_node_import(session, BCT_stops, { provider: "BCT" });
|
||||||
|
|
||||||
|
const BCTStopTimesData = await Deno.readTextFile(BCT_GTFS_stops_times_txt);
|
||||||
|
const BCT_stop_times = await parse_gtfs_stop_times(BCTStopTimesData);
|
||||||
|
await gtfs_edge_import(session, BCT_stop_times);
|
||||||
|
|
||||||
await session.close();
|
await session.close();
|
||||||
}
|
}
|
||||||
|
@ -51,7 +63,9 @@ export async function graph_setup(
|
||||||
async function stops_json_node_import(
|
async function stops_json_node_import(
|
||||||
session: Neo4j.Session,
|
session: Neo4j.Session,
|
||||||
stops: LocationNode[],
|
stops: LocationNode[],
|
||||||
|
options: { provider: string },
|
||||||
) {
|
) {
|
||||||
|
const { provider } = options;
|
||||||
for (const node of stops) {
|
for (const node of stops) {
|
||||||
await session.run(
|
await session.run(
|
||||||
`
|
`
|
||||||
|
@ -60,18 +74,19 @@ async function stops_json_node_import(
|
||||||
n.originalId = $originalId,
|
n.originalId = $originalId,
|
||||||
n.latitude = $lat,
|
n.latitude = $lat,
|
||||||
n.longitude = $lng,
|
n.longitude = $lng,
|
||||||
n.source = 'OCCT'
|
n.source = $provider
|
||||||
ON MATCH SET
|
ON MATCH SET
|
||||||
n.originalId = $originalId,
|
n.originalId = $originalId,
|
||||||
n.latitude = $lat,
|
n.latitude = $lat,
|
||||||
n.longitude = $lng,
|
n.longitude = $lng,
|
||||||
n.source = 'OCCT'
|
n.source = $provider
|
||||||
`,
|
`,
|
||||||
{
|
{
|
||||||
id: `OCCT_${node.id}`,
|
id: `${provider}_${node.id}`,
|
||||||
originalId: node.id,
|
originalId: node.id,
|
||||||
lat: node.lat,
|
lat: node.lat,
|
||||||
lng: node.lng,
|
lng: node.lng,
|
||||||
|
provider,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -80,6 +95,8 @@ async function stops_json_node_import(
|
||||||
async function stops_gtfs_node_import(
|
async function stops_gtfs_node_import(
|
||||||
session: Neo4j.Session,
|
session: Neo4j.Session,
|
||||||
stops: GTFSStop[],
|
stops: GTFSStop[],
|
||||||
|
// default provider is to avoid breaking upstream
|
||||||
|
options: { provider: string },
|
||||||
) {
|
) {
|
||||||
// Add GTFS stops to Neo4j
|
// Add GTFS stops to Neo4j
|
||||||
for (const stop of stops) {
|
for (const stop of stops) {
|
||||||
|
@ -89,7 +106,7 @@ async function stops_gtfs_node_import(
|
||||||
) {
|
) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
const { provider } = options;
|
||||||
// Use MERGE to update existing nodes or create new ones
|
// Use MERGE to update existing nodes or create new ones
|
||||||
await session.run(
|
await session.run(
|
||||||
`
|
`
|
||||||
|
@ -98,23 +115,20 @@ async function stops_gtfs_node_import(
|
||||||
s.name = $name,
|
s.name = $name,
|
||||||
s.latitude = $lat,
|
s.latitude = $lat,
|
||||||
s.longitude = $lng,
|
s.longitude = $lng,
|
||||||
s.locationType = $locationType,
|
|
||||||
s.parentStation = $parentStation,
|
|
||||||
s.zoneId = $zoneId,
|
|
||||||
s.url = $url,
|
s.url = $url,
|
||||||
s.source = 'BCT'
|
s.originalId = $originalId,
|
||||||
|
s.source = $provider
|
||||||
ON MATCH SET
|
ON MATCH SET
|
||||||
s.name = $name,
|
s.name = $name,
|
||||||
s.latitude = $lat,
|
s.latitude = $lat,
|
||||||
s.longitude = $lng,
|
s.longitude = $lng,
|
||||||
s.locationType = $locationType,
|
|
||||||
s.parentStation = $parentStation,
|
|
||||||
s.zoneId = $zoneId,
|
|
||||||
s.url = $url,
|
s.url = $url,
|
||||||
s.source = 'BCT'
|
s.originalId = $originalId,
|
||||||
|
s.source = $provider
|
||||||
`,
|
`,
|
||||||
{
|
{
|
||||||
id: "BCT_" + stop.stop_id,
|
id: `${provider}_` + stop.stop_id,
|
||||||
|
originalId: stop.stop_id,
|
||||||
name: stop.stop_name,
|
name: stop.stop_name,
|
||||||
lat: parseFloat(stop.stop_lat),
|
lat: parseFloat(stop.stop_lat),
|
||||||
lng: parseFloat(stop.stop_lon),
|
lng: parseFloat(stop.stop_lon),
|
||||||
|
@ -122,6 +136,7 @@ async function stops_gtfs_node_import(
|
||||||
parentStation: stop.parent_station || null,
|
parentStation: stop.parent_station || null,
|
||||||
zoneId: stop.zone_id || null,
|
zoneId: stop.zone_id || null,
|
||||||
url: stop.stop_url || null,
|
url: stop.stop_url || null,
|
||||||
|
provider,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -168,3 +183,69 @@ async function parse_gtfs_stops(txt: string): Promise<GTFSStop[]> {
|
||||||
|
|
||||||
return validStops;
|
return validStops;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function parse_gtfs_stop_times(
|
||||||
|
fileContents: string,
|
||||||
|
): Promise<GTFSStopTime[]> {
|
||||||
|
const records = await parseCSV(fileContents, {
|
||||||
|
skipFirstRow: true,
|
||||||
|
columns: true,
|
||||||
|
});
|
||||||
|
|
||||||
|
const stopTimes: GTFSStopTime[] = [];
|
||||||
|
|
||||||
|
for (const row of records as Record<string, string>[]) {
|
||||||
|
stopTimes.push({
|
||||||
|
trip_id: row.trip_id,
|
||||||
|
arrival_time: row.arrival_time,
|
||||||
|
departure_time: row.departure_time,
|
||||||
|
stop_id: row.stop_id,
|
||||||
|
stop_sequence: parseInt(row.stop_sequence, 10),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
return stopTimes;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function gtfs_edge_import(
|
||||||
|
session: neo4j.Session,
|
||||||
|
stop_times: GTFSStopTime[],
|
||||||
|
): Promise<void> {
|
||||||
|
const groupedByTrip = new Map<string, GTFSStopTime[]>();
|
||||||
|
|
||||||
|
// Group by trip_id
|
||||||
|
for (const stopTime of stop_times) {
|
||||||
|
if (!groupedByTrip.has(stopTime.trip_id)) {
|
||||||
|
groupedByTrip.set(stopTime.trip_id, []);
|
||||||
|
}
|
||||||
|
groupedByTrip.get(stopTime.trip_id)!.push(stopTime);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const [tripId, stops] of groupedByTrip.entries()) {
|
||||||
|
// Sort by stop_sequence
|
||||||
|
stops.sort((a, b) => a.stop_sequence - b.stop_sequence);
|
||||||
|
|
||||||
|
for (let i = 0; i < stops.length - 1; i++) {
|
||||||
|
const from = stops[i];
|
||||||
|
const to = stops[i + 1];
|
||||||
|
|
||||||
|
await session.run(
|
||||||
|
`
|
||||||
|
MATCH (from:TransportNode {originalId: $fromId}), (to:TransportNode {originalId: $toId})
|
||||||
|
MERGE (from)-[:DEPARTS_TO {
|
||||||
|
tripId: $tripId,
|
||||||
|
departureTime: $departureTime,
|
||||||
|
arrivalTime: $arrivalTime
|
||||||
|
}]->(to)
|
||||||
|
`,
|
||||||
|
{
|
||||||
|
fromId: from.stop_id,
|
||||||
|
toId: to.stop_id,
|
||||||
|
tripId,
|
||||||
|
departureTime: from.departure_time,
|
||||||
|
arrivalTime: to.arrival_time,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue