Skip to content

Commit 163f7da

Browse files
update (#13)
1 parent 55248ae commit 163f7da

File tree

3 files changed

+167
-31
lines changed

3 files changed

+167
-31
lines changed

pyspark_datasources/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,5 +3,6 @@
33
from .googlesheets import GoogleSheetsDataSource
44
from .huggingface import HuggingFaceDatasets
55
from .kaggle import KaggleDataSource
6+
from .opensky import OpenSkyDataSource
67
from .simplejson import SimpleJsonDataSource
78
from .stock import StockDataSource

pyspark_datasources/opensky.py

Lines changed: 148 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
Usage Example (Academic/Research):
1515
# Basic usage with region NORTH_AMERICA
1616
df = spark.readStream.format("opensky").load()
17-
17+
1818
# With specific region and authentication
1919
df = spark.readStream.format("opensky") \
2020
.option("region", "EUROPE") \
@@ -37,7 +37,7 @@
3737
When using this data in research or publications, please cite:
3838
"The OpenSky Network, https://opensky-network.org"
3939
40-
Author: Frank Munz, Databricks - Example Only, No Warranty
40+
Author: Frank Munz, Databricks - Example Only, No Warranty
4141
Purpose: Educational Example / Academic Research Tool
4242
Version: 1.0
4343
Last Updated: July-2025
@@ -56,14 +56,12 @@
5656
"Bringing Up OpenSky: A Large-scale ADS-B Sensor Network for Research".
5757
In Proceedings of the 13th IEEE/ACM International Symposium on Information Processing in Sensor Networks (IPSN), pages 83-94, April 2014.
5858
59-
6059
DISCLAIMER & LIABILITY:
6160
This code is provided "AS IS" for educational purposes only. The author and Databricks make no warranties, express or implied, and disclaim all liability for any damages, losses, or issues arising from the use of this code. Users assume full responsibility for compliance with all applicable terms of service, laws, and regulations. Use at your own risk.
6261
6362
For commercial use, contact OpenSky Network directly.
6463
================================================================================
6564
66-
6765
"""
6866

6967

@@ -106,34 +104,34 @@ class RateLimitError(OpenSkyAPIError):
106104
pass
107105

108106
class OpenSkyStreamReader(SimpleDataSourceStreamReader):
109-
107+
110108
DEFAULT_REGION = "NORTH_AMERICA"
111109
MIN_REQUEST_INTERVAL = 5.0 # seconds between requests
112110
ANONYMOUS_RATE_LIMIT = 100 # calls per day
113111
AUTHENTICATED_RATE_LIMIT = 4000 # calls per day
114112
MAX_RETRIES = 3
115113
RETRY_BACKOFF = 2
116114
RETRY_STATUS_CODES = [429, 500, 502, 503, 504]
117-
115+
118116
def __init__(self, schema: StructType, options: Dict[str, str]):
119117
super().__init__()
120118
self.schema = schema
121119
self.options = options
122120
self.session = self._create_session()
123121
self.last_request_time = 0
124-
122+
125123
region_name = options.get('region', self.DEFAULT_REGION).upper()
126124
try:
127125
self.bbox = Region[region_name].value
128126
except KeyError:
129127
print(f"Invalid region '{region_name}'. Defaulting to {self.DEFAULT_REGION}.")
130128
self.bbox = Region[self.DEFAULT_REGION].value
131-
129+
132130
self.client_id = options.get('client_id')
133131
self.client_secret = options.get('client_secret')
134132
self.access_token = None
135133
self.token_expires_at = 0
136-
134+
137135
if self.client_id and self.client_secret:
138136
self._get_access_token() # OAuth2 authentication
139137
self.rate_limit = self.AUTHENTICATED_RATE_LIMIT
@@ -145,23 +143,23 @@ def _get_access_token(self):
145143
current_time = time.time()
146144
if self.access_token and current_time < self.token_expires_at:
147145
return # Token still valid
148-
146+
149147
token_url = "https://auth.opensky-network.org/auth/realms/opensky-network/protocol/openid-connect/token"
150148
data = {
151149
"grant_type": "client_credentials",
152150
"client_id": self.client_id,
153151
"client_secret": self.client_secret
154152
}
155-
153+
156154
try:
157155
response = requests.post(token_url, data=data, timeout=10)
158156
response.raise_for_status()
159157
token_data = response.json()
160-
158+
161159
self.access_token = token_data["access_token"]
162160
expires_in = token_data.get("expires_in", 1800)
163161
self.token_expires_at = current_time + expires_in - 300
164-
162+
165163
except requests.exceptions.RequestException as e:
166164
raise OpenSkyAPIError(f"Failed to get access token: {str(e)}")
167165

@@ -185,45 +183,45 @@ def _handle_rate_limit(self):
185183
"""Ensure e MIN_REQUEST_INTERVAL seconds between requests"""
186184
current_time = time.time()
187185
time_since_last_request = current_time - self.last_request_time
188-
186+
189187
if time_since_last_request < self.MIN_REQUEST_INTERVAL:
190188
sleep_time = self.MIN_REQUEST_INTERVAL - time_since_last_request
191189
time.sleep(sleep_time)
192-
190+
193191
self.last_request_time = time.time()
194192

195193
def _fetch_states(self) -> requests.Response:
196194
"""Fetch states from OpenSky API with error handling"""
197195
self._handle_rate_limit()
198-
196+
199197
if self.client_id and self.client_secret:
200198
self._get_access_token()
201-
199+
202200
params = {
203201
'lamin': self.bbox.lamin,
204202
'lamax': self.bbox.lamax,
205203
'lomin': self.bbox.lomin,
206204
'lomax': self.bbox.lomax
207205
}
208-
206+
209207
headers = {}
210208
if self.access_token:
211209
headers['Authorization'] = f'Bearer {self.access_token}'
212-
210+
213211
try:
214212
response = self.session.get(
215213
"https://opensky-network.org/api/states/all",
216214
params=params,
217215
headers=headers,
218216
timeout=10
219217
)
220-
218+
221219
if response.status_code == 429:
222220
raise RateLimitError("API rate limit exceeded")
223221
response.raise_for_status()
224-
222+
225223
return response
226-
224+
227225
except requests.exceptions.RequestException as e:
228226
error_msg = f"API request failed: {str(e)}"
229227
if isinstance(e, requests.exceptions.Timeout):
@@ -236,7 +234,7 @@ def valid_state(self, state: List) -> bool:
236234
"""Validate state data"""
237235
if not state or len(state) < 17:
238236
return False
239-
237+
240238
return (state[0] is not None and # icao24
241239
state[5] is not None and # longitude
242240
state[6] is not None) # latitude
@@ -282,39 +280,160 @@ def safe_bool(value: Any) -> Optional[bool]:
282280
def readBetweenOffsets(self, start: Dict[str, int], end: Dict[str, int]) -> Iterator[Tuple]:
283281
data, _ = self.read(start)
284282
return iter(data)
285-
283+
286284
def read(self, start: Dict[str, int]) -> Tuple[List[Tuple], Dict[str, int]]:
287285
"""Read states with error handling and backoff"""
288286
try:
289287
response = self._fetch_states()
290288
data = response.json()
291-
289+
292290
valid_states = [
293291
self.parse_state(s, data['time'])
294292
for s in data.get('states', [])
295293
if self.valid_state(s)
296294
]
297-
295+
298296
return (
299297
valid_states,
300298
{'last_fetch': data.get('time', int(time.time()))}
301299
)
302-
300+
303301
except OpenSkyAPIError as e:
304302
print(f"OpenSky API Error: {str(e)}")
305303
return ([], start)
306304
except Exception as e:
307305
print(f"Unexpected error: {str(e)}")
308306
return ([], start)
309307

308+
310309
class OpenSkyDataSource(DataSource):
310+
"""
311+
Apache Spark DataSource for streaming real-time aircraft tracking data from OpenSky Network.
312+
313+
This data source provides access to live aircraft position, velocity, and flight data
314+
from the OpenSky Network's REST API (https://opensky-network.org/). The OpenSky Network
315+
is a community-based receiver network that collects air traffic surveillance data using
316+
ADS-B transponders and makes it available as open data for research and educational purposes.
317+
318+
The data source supports streaming aircraft state vectors including position coordinates,
319+
altitude, velocity, heading, call signs, and various flight status information for aircraft
320+
within configurable geographic regions.
321+
322+
Parameters
323+
----------
324+
options : Dict[str, str], optional
325+
Configuration options for the data source. Supported options:
326+
327+
region : str, default "NORTH_AMERICA"
328+
Geographic region to collect data from. Valid options:
329+
- "EUROPE": European airspace (35°N-72°N, 25°W-45°E)
330+
- "NORTH_AMERICA": North American airspace (7°N-72°N, 168°W-60°W)
331+
- "SOUTH_AMERICA": South American airspace (56°S-15°N, 90°W-30°W)
332+
- "ASIA": Asian airspace (10°S-82°N, 45°E-180°E)
333+
- "AUSTRALIA": Australian airspace (50°S-10°S, 110°E-180°E)
334+
- "AFRICA": African airspace (35°S-37°N, 20°W-52°E)
335+
- "GLOBAL": Worldwide coverage (90°S-90°N, 180°W-180°E)
336+
337+
client_id : str, optional
338+
OAuth2 client ID for authenticated access. Increases rate limit from
339+
100 to 4000 API calls per day. Requires corresponding client_secret.
340+
341+
client_secret : str, optional
342+
OAuth2 client secret for authenticated access. Must be provided when
343+
client_id is specified.
344+
345+
Examples
346+
--------
347+
Basic usage with default North America region:
348+
349+
>>> df = spark.readStream.format("opensky").load()
350+
>>> query = df.writeStream.format("console").start()
351+
352+
Specify a different region:
353+
354+
>>> df = spark.readStream.format("opensky") \\
355+
... .option("region", "EUROPE") \\
356+
... .load()
357+
358+
Authenticated access for higher rate limits:
359+
360+
>>> df = spark.readStream.format("opensky") \\
361+
... .option("region", "ASIA") \\
362+
... .option("client_id", "your_research_client_id") \\
363+
... .option("client_secret", "your_research_client_secret") \\
364+
... .load()
365+
366+
Process aircraft data with filtering:
367+
368+
>>> df = spark.readStream.format("opensky").load()
369+
>>> commercial_flights = df.filter(df.callsign.isNotNull() & (df.geo_altitude > 10000))
370+
>>> query = commercial_flights.writeStream.format("delta").option("path", "/tmp/flights").start()
371+
372+
Schema
373+
------
374+
The returned DataFrame contains the following columns:
375+
376+
- time_ingest (TimestampType): When the data was ingested
377+
- icao24 (StringType): Unique ICAO 24-bit address of the aircraft
378+
- callsign (StringType): Flight number or aircraft call sign
379+
- origin_country (StringType): Country where aircraft is registered
380+
- time_position (TimestampType): Last position update timestamp
381+
- last_contact (TimestampType): Last time aircraft was seen
382+
- longitude (DoubleType): Aircraft longitude in decimal degrees
383+
- latitude (DoubleType): Aircraft latitude in decimal degrees
384+
- geo_altitude (DoubleType): Aircraft altitude above sea level in meters
385+
- on_ground (BooleanType): Whether aircraft is on ground
386+
- velocity (DoubleType): Ground speed in m/s
387+
- true_track (DoubleType): Track angle in degrees (0° = north)
388+
- vertical_rate (DoubleType): Climb/descent rate in m/s
389+
- sensors (ArrayType[IntegerType]): Sensor IDs that detected aircraft
390+
- baro_altitude (DoubleType): Barometric altitude in meters
391+
- squawk (StringType): Transponder squawk code
392+
- spi (BooleanType): Special Position Identification flag
393+
- category (IntegerType): Aircraft category (0-15)
394+
395+
Rate Limits
396+
-----------
397+
- Anonymous access: 100 API calls per day
398+
- Authenticated access: 4000 API calls per day (research accounts)
399+
- Data contributors: 8000 API calls per day
400+
- Minimum 5-second interval between requests
401+
402+
Raises
403+
------
404+
ValueError
405+
If client_id is provided without client_secret, or if an invalid region is specified.
406+
407+
Notes
408+
-----
409+
- This data source is intended for academic research and educational purposes only
410+
- Commercial use requires explicit permission from OpenSky Network
411+
- Users must comply with OpenSky Network Terms of Use
412+
- All timestamps are in UTC timezone
413+
- Data may have gaps due to receiver coverage limitations
414+
- API rate limits are enforced automatically with exponential backoff
415+
416+
References
417+
----------
418+
OpenSky Network: https://opensky-network.org/
419+
API Documentation: https://opensky-network.org/apidoc/
420+
Terms of Use: https://opensky-network.org/about/terms-of-use
421+
422+
Citation
423+
--------
424+
When using this data in research, please cite:
425+
Matthias Schäfer, Martin Strohmeier, Vincent Lenders, Ivan Martinovic and Matthias Wilhelm.
426+
"Bringing Up OpenSky: A Large-scale ADS-B Sensor Network for Research".
427+
In Proceedings of the 13th IEEE/ACM International Symposium on Information Processing
428+
in Sensor Networks (IPSN), pages 83-94, April 2014.
429+
"""
311430
def __init__(self, options: Dict[str, str] = None):
312431
super().__init__(options or {})
313432
self.options = options or {}
314-
433+
315434
if 'client_id' in self.options and not self.options.get('client_secret'):
316435
raise ValueError("client_secret must be provided when client_id is set")
317-
436+
318437
if 'region' in self.options and self.options['region'].upper() not in Region.__members__:
319438
raise ValueError(f"Invalid region. Must be one of: {', '.join(Region.__members__.keys())}")
320439

@@ -346,5 +465,3 @@ def schema(self) -> StructType:
346465

347466
def simpleStreamReader(self, schema: StructType) -> OpenSkyStreamReader:
348467
return OpenSkyStreamReader(schema, self.options)
349-
350-
spark.dataSource.register(OpenSkyDataSource)

tests/test_data_sources.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,21 @@ def test_kaggle_datasource(spark):
4646
df.show()
4747
assert df.count() == 891
4848
assert len(df.columns) == 12
49+
50+
51+
def test_opensky_datasource_stream(spark):
52+
spark.dataSource.register(OpenSkyDataSource)
53+
(
54+
spark.readStream.format("opensky")
55+
.option("region", "EUROPE")
56+
.load()
57+
.writeStream.format("memory")
58+
.queryName("opensky_result")
59+
.trigger(once=True)
60+
.start()
61+
.awaitTermination()
62+
)
63+
result = spark.sql("SELECT * FROM opensky_result")
64+
result.show()
65+
assert len(result.columns) == 18 # Check schema has expected number of fields
66+
assert result.count() > 0 # Verify we got some data

0 commit comments

Comments
 (0)