|
2 | 2 | # -*- coding: utf-8 -*
|
3 | 3 |
|
4 | 4 | import json
|
| 5 | +import sqlite3 |
5 | 6 | import sys
|
6 | 7 | import urllib.parse
|
7 | 8 |
|
8 | 9 | import click
|
| 10 | +import pandas as pd |
9 | 11 | import requests
|
10 | 12 |
|
11 | 13 |
|
12 |
| -def geocodage(verbose, address): |
13 |
| - try: |
14 |
| - if verbose: |
15 |
| - print("[+] Geocoding address...") |
16 |
| - address = urllib.parse.quote(address) |
17 |
| - r = requests.get('https://api-adresse.data.gouv.fr/search/?q=' + address) |
| 14 | +@click.group |
| 15 | +def cli(): |
| 16 | + pass |
| 17 | + |
| 18 | + |
| 19 | +@click.command(name="geo") |
| 20 | +@click.option('--address', '-a', help='Address to geocode', type=str, required=True) |
| 21 | +@click.option('--limit', '-l', default=0, help='Number of results for each geocoding.', show_default=True, type=int) |
| 22 | +@click.option('--output-csv', '-csv', type=click.Path(writable=True), |
| 23 | + help='Path to CSV file where results will be saved.') |
| 24 | +@click.option('--include-header', '-hdr', is_flag=True, default=False, show_default=True, |
| 25 | + help='Include header row in the CSV output.') |
| 26 | +@click.option('--include-index', '-idx', is_flag=True, default=False, show_default=True, |
| 27 | + help='Include DataFrame index in the CSV / SQL output.') |
| 28 | +@click.option('--sqlite', '-d', type=click.Path(writable=True), |
| 29 | + help='Path to SQLite database file where results will be saved.') |
| 30 | +@click.option('--table-name', '-t', type=str, default="data", show_default=True, |
| 31 | + help='Name of the table to insert data into in the SQLite database.') |
| 32 | +@click.option('--mode', '-m', type=click.Choice(['fail', 'replace', 'append'], case_sensitive=False), default="append", |
| 33 | + show_default=True, help='How to behave if the file already exists.') |
| 34 | +@click.option('--verbose', '-v', is_flag=True, help="More information displayed.") |
| 35 | +def geocoding(address, limit, output_csv, sqlite, table_name, include_header, mode, include_index, verbose): |
| 36 | + """ |
| 37 | + Geocoding a single address. |
| 38 | + """ |
| 39 | + geocoded = perform_geocoding(address, limit, verbose) |
| 40 | + if verbose: |
| 41 | + print("-------------------------------------------------------------") |
| 42 | + print(geocoded[["geometry_coordinates", "properties_label"]]) |
| 43 | + print("-------------------------------------------------------------") |
| 44 | + if geocoded is not None: |
| 45 | + if output_csv is not None: |
| 46 | + if mode == "fail": |
| 47 | + csvmode = "x" |
| 48 | + elif mode == "replace": |
| 49 | + csvmode = "w" |
| 50 | + else: |
| 51 | + csvmode = "a" |
| 52 | + export_to_csv(geocoded, file=output_csv, mode=csvmode, header=include_header, index=include_index, |
| 53 | + verbose=verbose) |
| 54 | + if sqlite is not None: |
| 55 | + export_to_sqlite(geocoded, database=sqlite, table=table_name, mode=mode, index=include_index, |
| 56 | + verbose=verbose) |
| 57 | + else: |
| 58 | + click.echo(f'No results found for {address}') |
| 59 | + |
| 60 | + |
| 61 | +@click.command(name="file") |
| 62 | +@click.option('--input-file', '-i', help='Addresses file to geocode', required=True, type=click.Path(exists=True)) |
| 63 | +@click.option('--limit', '-l', default=0, help='Number of results for each geocoding.', show_default=True, type=int) |
| 64 | +@click.option('--output-csv', '-csv', type=click.Path(writable=True), |
| 65 | + help='Path to CSV file where results will be saved.') |
| 66 | +@click.option('--include-header', '-hdr', is_flag=True, default=False, show_default=True, |
| 67 | + help='Include header row in the CSV output.') |
| 68 | +@click.option('--include-index', '-idx', is_flag=True, default=False, show_default=True, |
| 69 | + help='Include DataFrame index in the CSV / SQL output.') |
| 70 | +@click.option('--sqlite', '-d', type=click.Path(writable=True), |
| 71 | + help='Path to SQLite database file where results will be saved.') |
| 72 | +@click.option('--table-name', '-t', type=str, default="data", show_default=True, |
| 73 | + help='Name of the table to insert data into in the SQLite database.') |
| 74 | +@click.option('--mode', '-m', type=click.Choice(['fail', 'replace', 'append'], case_sensitive=False), default="append", |
| 75 | + show_default=True, help='How to behave if the file already exists.') |
| 76 | +@click.option('--verbose', '-v', is_flag=True, help="More information displayed.") |
| 77 | +def geocoding_from_file(input_file, limit, output_csv, sqlite, table_name, include_header, mode, include_index, |
| 78 | + verbose): |
| 79 | + """ |
| 80 | + Geocoding addresses from file. |
| 81 | + """ |
| 82 | + if verbose: |
| 83 | + print("[+] Reading file {}".format(input_file)) |
| 84 | + with open(input_file, "r") as f: |
| 85 | + geocoded = pd.DataFrame() |
| 86 | + for line in f: |
| 87 | + geocoded = geocoded._append(perform_geocoding(line, limit, verbose)) |
| 88 | + if verbose: |
| 89 | + print("-------------------------------------------------------------") |
| 90 | + print(geocoded[["geometry_coordinates", "properties_label"]]) |
| 91 | + print("-------------------------------------------------------------") |
| 92 | + if output_csv is not None: |
| 93 | + if mode == "fail": |
| 94 | + csvmode = "x" |
| 95 | + elif mode == "replace": |
| 96 | + csvmode = "w" |
| 97 | + else: |
| 98 | + csvmode = "a" |
| 99 | + export_to_csv(geocoded, file=output_csv, mode=csvmode, header=include_header, index=include_index, |
| 100 | + verbose=verbose) |
| 101 | + if sqlite is not None: |
| 102 | + export_to_sqlite(geocoded, database=sqlite, table=table_name, mode=mode, index=include_index, verbose=verbose) |
| 103 | + return geocoded |
| 104 | + |
| 105 | + |
| 106 | +def perform_geocoding(address, limit, verbose): |
| 107 | + if verbose: |
| 108 | + print("[+] Geocoding address : {}".format(address)) |
| 109 | + address = urllib.parse.quote(address) |
| 110 | + r = requests.get('https://api-adresse.data.gouv.fr/search/?q=' + address) |
| 111 | + if len(r.text) != 136 and r.status_code == 200: |
18 | 112 | json_data = json.loads(r.text)
|
19 |
| - results = [] |
20 |
| - if json_data is not None: |
21 |
| - i = 0 |
22 |
| - for _ in json_data["features"]: |
23 |
| - tmp = {"address": json_data["features"][i]["properties"]["label"], |
24 |
| - "score": json_data["features"][i]["properties"]["score"], |
25 |
| - "latitude": json_data["features"][i]["geometry"]["coordinates"][0], |
26 |
| - "longitude": json_data["features"][i]["geometry"]["coordinates"][1]} |
27 |
| - # results.append(json_data["features"][i]["properties"]["label"]) |
28 |
| - data = str(tmp["address"]) + ";" + str(tmp["score"]) + ";" + str(tmp["latitude"]) + ";" + str( |
29 |
| - tmp["longitude"]) |
30 |
| - results.append(data) |
31 |
| - i = i + 1 |
32 |
| - return results |
33 |
| - except: |
34 |
| - print("Error") |
35 |
| - |
36 |
| - |
37 |
| -@click.command(context_settings=dict(help_option_names=['-h', '--help'])) |
38 |
| -@click.option('--verbose', '-v', is_flag=True, help="Verbose mode") |
39 |
| -@click.option('--address', '-a', help='Address to be geocoded', required=True, type=str) |
40 |
| -@click.option('--nb', '-n', help='Number of results to return [default=1]', default=0, show_default=False, type=int) |
41 |
| -@click.option('--gps', is_flag=True, help='Show only GPS coordinates') |
42 |
| -def cli(verbose, address, nb, gps): |
| 113 | + df = pd.json_normalize( |
| 114 | + json_data['features'], |
| 115 | + sep='_', |
| 116 | + record_path=None, |
| 117 | + meta=[ |
| 118 | + 'type', |
| 119 | + ['properties', 'label'], |
| 120 | + ['properties', 'score'], |
| 121 | + ['properties', 'housenumber'], |
| 122 | + ['properties', 'id'], |
| 123 | + ['properties', 'name'], |
| 124 | + ['properties', 'postcode'], |
| 125 | + ['properties', 'citycode'], |
| 126 | + ['properties', 'x'], |
| 127 | + ['properties', 'y'], |
| 128 | + ['properties', 'city'], |
| 129 | + ['properties', 'context'], |
| 130 | + ['properties', 'type'], |
| 131 | + ['properties', 'importance'], |
| 132 | + ['properties', 'street'], |
| 133 | + 'geometry.type', |
| 134 | + ['geometry', 'coordinates'] |
| 135 | + ], |
| 136 | + errors='ignore' |
| 137 | + ) |
| 138 | + geocoded = df.loc[:limit].copy() |
| 139 | + return geocoded |
| 140 | + else: |
| 141 | + return None |
| 142 | + |
| 143 | + |
| 144 | +def export_to_csv(data, file, mode, header, index, verbose): |
| 145 | + """ |
| 146 | + Export data to a CSV file. |
| 147 | + """ |
| 148 | + if not file.endswith(".csv"): |
| 149 | + file = "{}.csv".format(file) |
| 150 | + if verbose: |
| 151 | + print('[+] Data exported successfully to "{}".'.format(file)) |
| 152 | + data.to_csv(file, index=index, mode=mode, header=header) |
| 153 | + |
| 154 | + |
| 155 | +def export_to_sqlite(data, database, table, mode, index, verbose): |
| 156 | + """ |
| 157 | + Export data to an SQLite database. |
| 158 | + """ |
| 159 | + if not database.endswith(".db"): |
| 160 | + database = '{}.db'.format(database) |
43 | 161 | if verbose:
|
44 |
| - print("Geocoding addresses using the national address database API - https://adresse.data.gouv.fr/ - BAN") |
45 |
| - address = geocodage(verbose, address) |
46 |
| - if address is not None: |
47 |
| - nbresults = len(address) |
48 |
| - i = 0 |
49 |
| - if nb!=0: |
50 |
| - nb = nb - 1 |
51 |
| - for _ in address: |
52 |
| - if gps: |
53 |
| - if verbose: |
54 |
| - print("{:10} {:10}".format("Longitude", "Latitude")) |
55 |
| - print("{:10} {:10}".format(address[i].split(";")[3], address[i].split(";")[2])) |
56 |
| - if not gps: |
57 |
| - if verbose: |
58 |
| - print("{:10} {:10} {:18} {:10}".format("Longitude", "Latitude", "Score", "Address")) |
59 |
| - |
60 |
| - print("{:10} {:10} {:10} {:10}".format(address[i].split(";")[3], address[i].split(";")[2], |
61 |
| - address[i].split(";")[1], address[i].split(";")[0])) |
62 |
| - if nb == nbresults: |
63 |
| - pass |
64 |
| - elif nb > nbresults: |
65 |
| - nb = nbresults |
66 |
| - elif nb == i: |
67 |
| - break |
68 |
| - else: |
69 |
| - pass |
70 |
| - i = i + 1 |
| 162 | + print('[+] Data exported successfully to table "{}" in database "{}".'.format(table, database)) |
| 163 | + conn = sqlite3.connect(database) |
| 164 | + try: |
| 165 | + # Convert any list in the DataFrame to a string representation |
| 166 | + for column in data.columns: |
| 167 | + if data[column].apply(lambda x: isinstance(x, list)).any(): |
| 168 | + # Here we join the list items into a string separated by a comma |
| 169 | + data[column] = data[column].apply(lambda x: ','.join(map(str, x)) if isinstance(x, list) else x) |
| 170 | + |
| 171 | + data.to_sql(con=conn, name=table, if_exists=mode, index=index) |
| 172 | + except ValueError as e: |
| 173 | + print("[!] Error : {}".format(e)) |
| 174 | + exit(1) |
| 175 | + finally: |
| 176 | + conn.close() |
| 177 | + |
71 | 178 |
|
| 179 | +cli.add_command(geocoding) |
| 180 | +cli.add_command(geocoding_from_file) |
72 | 181 |
|
73 | 182 | if __name__ == '__main__':
|
74 | 183 | if len(sys.argv) == 1:
|
|
0 commit comments