Compare commits
No commits in common. "390e2ae57d588dca86cff5e1ca8a8a5d7bee3ee0" and "c39cbab3ddb165d4dfd90ca814569cf79f999dc6" have entirely different histories.
390e2ae57d
...
c39cbab3dd
2 changed files with 15 additions and 97 deletions
|
@ -31,7 +31,6 @@ services:
|
|||
environment:
|
||||
# neo4j isn't exposed to the internet so having the password checked into version control doesn't matter
|
||||
- NEO4J_AUTH=neo4j/your_password
|
||||
- NEO4J_PLUGINS=["graph-data-science"]
|
||||
ports:
|
||||
# useful for dev
|
||||
- "127.0.0.1:7474:7474"
|
||||
|
|
|
@ -20,13 +20,6 @@ interface GTFSStop {
|
|||
stop_url?: string;
|
||||
[key: string]: string | undefined; // Allow for additional fields
|
||||
}
|
||||
interface GTFSStopTime {
|
||||
trip_id: string;
|
||||
arrival_time: string;
|
||||
departure_time: string;
|
||||
stop_id: string;
|
||||
stop_sequence: number;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up a Neo4j graph database with location nodes from JSON and GTFS stop data
|
||||
|
@ -38,24 +31,19 @@ export async function graph_setup(
|
|||
driver: Neo4j.Driver,
|
||||
OCCT_stops_json: string = "./data/OCCT/stops.json",
|
||||
BCT_GTFS_stops_txt: string = "./data/BCT/stops.txt",
|
||||
BCT_GTFS_stops_times_txt: string = "./data/BCT/stop_times.txt",
|
||||
): Promise<void> {
|
||||
const session = driver.session();
|
||||
|
||||
const jsonData = await Deno.readTextFile(OCCT_stops_json);
|
||||
const locationNodes: LocationNode[] = JSON.parse(jsonData);
|
||||
|
||||
await stops_json_node_import(session, locationNodes, { provider: "OCCT" });
|
||||
await stops_json_node_import(session, locationNodes);
|
||||
|
||||
const BCTStopsData = await Deno.readTextFile(BCT_GTFS_stops_txt);
|
||||
|
||||
const BCT_stops = await parse_gtfs_stops(BCTStopsData);
|
||||
|
||||
await stops_gtfs_node_import(session, BCT_stops, { provider: "BCT" });
|
||||
|
||||
const BCTStopTimesData = await Deno.readTextFile(BCT_GTFS_stops_times_txt);
|
||||
const BCT_stop_times = await parse_gtfs_stop_times(BCTStopTimesData);
|
||||
await gtfs_edge_import(session, BCT_stop_times);
|
||||
await stops_gtfs_node_import(session, BCT_stops);
|
||||
|
||||
await session.close();
|
||||
}
|
||||
|
@ -63,9 +51,7 @@ export async function graph_setup(
|
|||
async function stops_json_node_import(
|
||||
session: Neo4j.Session,
|
||||
stops: LocationNode[],
|
||||
options: { provider: string },
|
||||
) {
|
||||
const { provider } = options;
|
||||
for (const node of stops) {
|
||||
await session.run(
|
||||
`
|
||||
|
@ -74,19 +60,18 @@ async function stops_json_node_import(
|
|||
n.originalId = $originalId,
|
||||
n.latitude = $lat,
|
||||
n.longitude = $lng,
|
||||
n.source = $provider
|
||||
n.source = 'OCCT'
|
||||
ON MATCH SET
|
||||
n.originalId = $originalId,
|
||||
n.latitude = $lat,
|
||||
n.longitude = $lng,
|
||||
n.source = $provider
|
||||
n.source = 'OCCT'
|
||||
`,
|
||||
{
|
||||
id: `${provider}_${node.id}`,
|
||||
id: `OCCT_${node.id}`,
|
||||
originalId: node.id,
|
||||
lat: node.lat,
|
||||
lng: node.lng,
|
||||
provider,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
@ -95,8 +80,6 @@ async function stops_json_node_import(
|
|||
async function stops_gtfs_node_import(
|
||||
session: Neo4j.Session,
|
||||
stops: GTFSStop[],
|
||||
// default provider is to avoid breaking upstream
|
||||
options: { provider: string },
|
||||
) {
|
||||
// Add GTFS stops to Neo4j
|
||||
for (const stop of stops) {
|
||||
|
@ -106,7 +89,7 @@ async function stops_gtfs_node_import(
|
|||
) {
|
||||
continue;
|
||||
}
|
||||
const { provider } = options;
|
||||
|
||||
// Use MERGE to update existing nodes or create new ones
|
||||
await session.run(
|
||||
`
|
||||
|
@ -115,20 +98,23 @@ async function stops_gtfs_node_import(
|
|||
s.name = $name,
|
||||
s.latitude = $lat,
|
||||
s.longitude = $lng,
|
||||
s.locationType = $locationType,
|
||||
s.parentStation = $parentStation,
|
||||
s.zoneId = $zoneId,
|
||||
s.url = $url,
|
||||
s.originalId = $originalId,
|
||||
s.source = $provider
|
||||
s.source = 'BCT'
|
||||
ON MATCH SET
|
||||
s.name = $name,
|
||||
s.latitude = $lat,
|
||||
s.longitude = $lng,
|
||||
s.locationType = $locationType,
|
||||
s.parentStation = $parentStation,
|
||||
s.zoneId = $zoneId,
|
||||
s.url = $url,
|
||||
s.originalId = $originalId,
|
||||
s.source = $provider
|
||||
s.source = 'BCT'
|
||||
`,
|
||||
{
|
||||
id: `${provider}_` + stop.stop_id,
|
||||
originalId: stop.stop_id,
|
||||
id: "BCT_" + stop.stop_id,
|
||||
name: stop.stop_name,
|
||||
lat: parseFloat(stop.stop_lat),
|
||||
lng: parseFloat(stop.stop_lon),
|
||||
|
@ -136,7 +122,6 @@ async function stops_gtfs_node_import(
|
|||
parentStation: stop.parent_station || null,
|
||||
zoneId: stop.zone_id || null,
|
||||
url: stop.stop_url || null,
|
||||
provider,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
@ -183,69 +168,3 @@ async function parse_gtfs_stops(txt: string): Promise<GTFSStop[]> {
|
|||
|
||||
return validStops;
|
||||
}
|
||||
|
||||
async function parse_gtfs_stop_times(
|
||||
fileContents: string,
|
||||
): Promise<GTFSStopTime[]> {
|
||||
const records = await parseCSV(fileContents, {
|
||||
skipFirstRow: true,
|
||||
columns: true,
|
||||
});
|
||||
|
||||
const stopTimes: GTFSStopTime[] = [];
|
||||
|
||||
for (const row of records as Record<string, string>[]) {
|
||||
stopTimes.push({
|
||||
trip_id: row.trip_id,
|
||||
arrival_time: row.arrival_time,
|
||||
departure_time: row.departure_time,
|
||||
stop_id: row.stop_id,
|
||||
stop_sequence: parseInt(row.stop_sequence, 10),
|
||||
});
|
||||
}
|
||||
|
||||
return stopTimes;
|
||||
}
|
||||
|
||||
async function gtfs_edge_import(
|
||||
session: neo4j.Session,
|
||||
stop_times: GTFSStopTime[],
|
||||
): Promise<void> {
|
||||
const groupedByTrip = new Map<string, GTFSStopTime[]>();
|
||||
|
||||
// Group by trip_id
|
||||
for (const stopTime of stop_times) {
|
||||
if (!groupedByTrip.has(stopTime.trip_id)) {
|
||||
groupedByTrip.set(stopTime.trip_id, []);
|
||||
}
|
||||
groupedByTrip.get(stopTime.trip_id)!.push(stopTime);
|
||||
}
|
||||
|
||||
for (const [tripId, stops] of groupedByTrip.entries()) {
|
||||
// Sort by stop_sequence
|
||||
stops.sort((a, b) => a.stop_sequence - b.stop_sequence);
|
||||
|
||||
for (let i = 0; i < stops.length - 1; i++) {
|
||||
const from = stops[i];
|
||||
const to = stops[i + 1];
|
||||
|
||||
await session.run(
|
||||
`
|
||||
MATCH (from:TransportNode {originalId: $fromId}), (to:TransportNode {originalId: $toId})
|
||||
MERGE (from)-[:DEPARTS_TO {
|
||||
tripId: $tripId,
|
||||
departureTime: $departureTime,
|
||||
arrivalTime: $arrivalTime
|
||||
}]->(to)
|
||||
`,
|
||||
{
|
||||
fromId: from.stop_id,
|
||||
toId: to.stop_id,
|
||||
tripId,
|
||||
departureTime: from.departure_time,
|
||||
arrivalTime: to.arrival_time,
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue