Compare commits
No commits in common. "390e2ae57d588dca86cff5e1ca8a8a5d7bee3ee0" and "c39cbab3ddb165d4dfd90ca814569cf79f999dc6" have entirely different histories.
390e2ae57d
...
c39cbab3dd
2 changed files with 15 additions and 97 deletions
|
@ -31,7 +31,6 @@ services:
|
||||||
environment:
|
environment:
|
||||||
# neo4j isn't exposed to the internet so having the password checked into version control doesn't matter
|
# neo4j isn't exposed to the internet so having the password checked into version control doesn't matter
|
||||||
- NEO4J_AUTH=neo4j/your_password
|
- NEO4J_AUTH=neo4j/your_password
|
||||||
- NEO4J_PLUGINS=["graph-data-science"]
|
|
||||||
ports:
|
ports:
|
||||||
# useful for dev
|
# useful for dev
|
||||||
- "127.0.0.1:7474:7474"
|
- "127.0.0.1:7474:7474"
|
||||||
|
|
|
@ -20,13 +20,6 @@ interface GTFSStop {
|
||||||
stop_url?: string;
|
stop_url?: string;
|
||||||
[key: string]: string | undefined; // Allow for additional fields
|
[key: string]: string | undefined; // Allow for additional fields
|
||||||
}
|
}
|
||||||
interface GTFSStopTime {
|
|
||||||
trip_id: string;
|
|
||||||
arrival_time: string;
|
|
||||||
departure_time: string;
|
|
||||||
stop_id: string;
|
|
||||||
stop_sequence: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets up a Neo4j graph database with location nodes from JSON and GTFS stop data
|
* Sets up a Neo4j graph database with location nodes from JSON and GTFS stop data
|
||||||
|
@ -38,24 +31,19 @@ export async function graph_setup(
|
||||||
driver: Neo4j.Driver,
|
driver: Neo4j.Driver,
|
||||||
OCCT_stops_json: string = "./data/OCCT/stops.json",
|
OCCT_stops_json: string = "./data/OCCT/stops.json",
|
||||||
BCT_GTFS_stops_txt: string = "./data/BCT/stops.txt",
|
BCT_GTFS_stops_txt: string = "./data/BCT/stops.txt",
|
||||||
BCT_GTFS_stops_times_txt: string = "./data/BCT/stop_times.txt",
|
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const session = driver.session();
|
const session = driver.session();
|
||||||
|
|
||||||
const jsonData = await Deno.readTextFile(OCCT_stops_json);
|
const jsonData = await Deno.readTextFile(OCCT_stops_json);
|
||||||
const locationNodes: LocationNode[] = JSON.parse(jsonData);
|
const locationNodes: LocationNode[] = JSON.parse(jsonData);
|
||||||
|
|
||||||
await stops_json_node_import(session, locationNodes, { provider: "OCCT" });
|
await stops_json_node_import(session, locationNodes);
|
||||||
|
|
||||||
const BCTStopsData = await Deno.readTextFile(BCT_GTFS_stops_txt);
|
const BCTStopsData = await Deno.readTextFile(BCT_GTFS_stops_txt);
|
||||||
|
|
||||||
const BCT_stops = await parse_gtfs_stops(BCTStopsData);
|
const BCT_stops = await parse_gtfs_stops(BCTStopsData);
|
||||||
|
|
||||||
await stops_gtfs_node_import(session, BCT_stops, { provider: "BCT" });
|
await stops_gtfs_node_import(session, BCT_stops);
|
||||||
|
|
||||||
const BCTStopTimesData = await Deno.readTextFile(BCT_GTFS_stops_times_txt);
|
|
||||||
const BCT_stop_times = await parse_gtfs_stop_times(BCTStopTimesData);
|
|
||||||
await gtfs_edge_import(session, BCT_stop_times);
|
|
||||||
|
|
||||||
await session.close();
|
await session.close();
|
||||||
}
|
}
|
||||||
|
@ -63,9 +51,7 @@ export async function graph_setup(
|
||||||
async function stops_json_node_import(
|
async function stops_json_node_import(
|
||||||
session: Neo4j.Session,
|
session: Neo4j.Session,
|
||||||
stops: LocationNode[],
|
stops: LocationNode[],
|
||||||
options: { provider: string },
|
|
||||||
) {
|
) {
|
||||||
const { provider } = options;
|
|
||||||
for (const node of stops) {
|
for (const node of stops) {
|
||||||
await session.run(
|
await session.run(
|
||||||
`
|
`
|
||||||
|
@ -74,19 +60,18 @@ async function stops_json_node_import(
|
||||||
n.originalId = $originalId,
|
n.originalId = $originalId,
|
||||||
n.latitude = $lat,
|
n.latitude = $lat,
|
||||||
n.longitude = $lng,
|
n.longitude = $lng,
|
||||||
n.source = $provider
|
n.source = 'OCCT'
|
||||||
ON MATCH SET
|
ON MATCH SET
|
||||||
n.originalId = $originalId,
|
n.originalId = $originalId,
|
||||||
n.latitude = $lat,
|
n.latitude = $lat,
|
||||||
n.longitude = $lng,
|
n.longitude = $lng,
|
||||||
n.source = $provider
|
n.source = 'OCCT'
|
||||||
`,
|
`,
|
||||||
{
|
{
|
||||||
id: `${provider}_${node.id}`,
|
id: `OCCT_${node.id}`,
|
||||||
originalId: node.id,
|
originalId: node.id,
|
||||||
lat: node.lat,
|
lat: node.lat,
|
||||||
lng: node.lng,
|
lng: node.lng,
|
||||||
provider,
|
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -95,8 +80,6 @@ async function stops_json_node_import(
|
||||||
async function stops_gtfs_node_import(
|
async function stops_gtfs_node_import(
|
||||||
session: Neo4j.Session,
|
session: Neo4j.Session,
|
||||||
stops: GTFSStop[],
|
stops: GTFSStop[],
|
||||||
// default provider is to avoid breaking upstream
|
|
||||||
options: { provider: string },
|
|
||||||
) {
|
) {
|
||||||
// Add GTFS stops to Neo4j
|
// Add GTFS stops to Neo4j
|
||||||
for (const stop of stops) {
|
for (const stop of stops) {
|
||||||
|
@ -106,7 +89,7 @@ async function stops_gtfs_node_import(
|
||||||
) {
|
) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
const { provider } = options;
|
|
||||||
// Use MERGE to update existing nodes or create new ones
|
// Use MERGE to update existing nodes or create new ones
|
||||||
await session.run(
|
await session.run(
|
||||||
`
|
`
|
||||||
|
@ -115,20 +98,23 @@ async function stops_gtfs_node_import(
|
||||||
s.name = $name,
|
s.name = $name,
|
||||||
s.latitude = $lat,
|
s.latitude = $lat,
|
||||||
s.longitude = $lng,
|
s.longitude = $lng,
|
||||||
|
s.locationType = $locationType,
|
||||||
|
s.parentStation = $parentStation,
|
||||||
|
s.zoneId = $zoneId,
|
||||||
s.url = $url,
|
s.url = $url,
|
||||||
s.originalId = $originalId,
|
s.source = 'BCT'
|
||||||
s.source = $provider
|
|
||||||
ON MATCH SET
|
ON MATCH SET
|
||||||
s.name = $name,
|
s.name = $name,
|
||||||
s.latitude = $lat,
|
s.latitude = $lat,
|
||||||
s.longitude = $lng,
|
s.longitude = $lng,
|
||||||
|
s.locationType = $locationType,
|
||||||
|
s.parentStation = $parentStation,
|
||||||
|
s.zoneId = $zoneId,
|
||||||
s.url = $url,
|
s.url = $url,
|
||||||
s.originalId = $originalId,
|
s.source = 'BCT'
|
||||||
s.source = $provider
|
|
||||||
`,
|
`,
|
||||||
{
|
{
|
||||||
id: `${provider}_` + stop.stop_id,
|
id: "BCT_" + stop.stop_id,
|
||||||
originalId: stop.stop_id,
|
|
||||||
name: stop.stop_name,
|
name: stop.stop_name,
|
||||||
lat: parseFloat(stop.stop_lat),
|
lat: parseFloat(stop.stop_lat),
|
||||||
lng: parseFloat(stop.stop_lon),
|
lng: parseFloat(stop.stop_lon),
|
||||||
|
@ -136,7 +122,6 @@ async function stops_gtfs_node_import(
|
||||||
parentStation: stop.parent_station || null,
|
parentStation: stop.parent_station || null,
|
||||||
zoneId: stop.zone_id || null,
|
zoneId: stop.zone_id || null,
|
||||||
url: stop.stop_url || null,
|
url: stop.stop_url || null,
|
||||||
provider,
|
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
@ -183,69 +168,3 @@ async function parse_gtfs_stops(txt: string): Promise<GTFSStop[]> {
|
||||||
|
|
||||||
return validStops;
|
return validStops;
|
||||||
}
|
}
|
||||||
|
|
||||||
async function parse_gtfs_stop_times(
|
|
||||||
fileContents: string,
|
|
||||||
): Promise<GTFSStopTime[]> {
|
|
||||||
const records = await parseCSV(fileContents, {
|
|
||||||
skipFirstRow: true,
|
|
||||||
columns: true,
|
|
||||||
});
|
|
||||||
|
|
||||||
const stopTimes: GTFSStopTime[] = [];
|
|
||||||
|
|
||||||
for (const row of records as Record<string, string>[]) {
|
|
||||||
stopTimes.push({
|
|
||||||
trip_id: row.trip_id,
|
|
||||||
arrival_time: row.arrival_time,
|
|
||||||
departure_time: row.departure_time,
|
|
||||||
stop_id: row.stop_id,
|
|
||||||
stop_sequence: parseInt(row.stop_sequence, 10),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
return stopTimes;
|
|
||||||
}
|
|
||||||
|
|
||||||
async function gtfs_edge_import(
|
|
||||||
session: neo4j.Session,
|
|
||||||
stop_times: GTFSStopTime[],
|
|
||||||
): Promise<void> {
|
|
||||||
const groupedByTrip = new Map<string, GTFSStopTime[]>();
|
|
||||||
|
|
||||||
// Group by trip_id
|
|
||||||
for (const stopTime of stop_times) {
|
|
||||||
if (!groupedByTrip.has(stopTime.trip_id)) {
|
|
||||||
groupedByTrip.set(stopTime.trip_id, []);
|
|
||||||
}
|
|
||||||
groupedByTrip.get(stopTime.trip_id)!.push(stopTime);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const [tripId, stops] of groupedByTrip.entries()) {
|
|
||||||
// Sort by stop_sequence
|
|
||||||
stops.sort((a, b) => a.stop_sequence - b.stop_sequence);
|
|
||||||
|
|
||||||
for (let i = 0; i < stops.length - 1; i++) {
|
|
||||||
const from = stops[i];
|
|
||||||
const to = stops[i + 1];
|
|
||||||
|
|
||||||
await session.run(
|
|
||||||
`
|
|
||||||
MATCH (from:TransportNode {originalId: $fromId}), (to:TransportNode {originalId: $toId})
|
|
||||||
MERGE (from)-[:DEPARTS_TO {
|
|
||||||
tripId: $tripId,
|
|
||||||
departureTime: $departureTime,
|
|
||||||
arrivalTime: $arrivalTime
|
|
||||||
}]->(to)
|
|
||||||
`,
|
|
||||||
{
|
|
||||||
fromId: from.stop_id,
|
|
||||||
toId: to.stop_id,
|
|
||||||
tripId,
|
|
||||||
departureTime: from.departure_time,
|
|
||||||
arrivalTime: to.arrival_time,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in a new issue