The hardware and bandwidth for this mirror is donated by dogado GmbH, the Webhosting and Full Service-Cloud Provider. Check out our Wordpress Tutorial.
If you wish to report a bug, or if you are interested in having us mirror your free-software or open-source project, please feel free to contact us at mirror[@]dogado.de.
Hourly Pipeline
This pipeline is an example of a standard extract transform load (ETL) workflow. The pipeline is scheduled to run every 3 hours starting on 2024-04-25 at 05:45:00. The goal of the pipeline is to perform the following:
- access online hosted CSV file
- perform lite data wrangling
- write file to local storage in parquet format
This example is setup as a simple set of tasks creating objects that are used in the next series of tasks. All components of the pipeline are within the pipeline_wildfire_hourly
function, which has no parameters.
#' pipeline_wildfire_hourly maestro pipeline
#'
#' @maestroFrequency 3 hour
#' @maestroStartTime 2024-04-25 05:45:00
#' @maestroTz America/Halifax
pipeline_wildfire_hourly <- function() {
# load libraries
library(dplyr)
library(readr)
library(sf)
library(sfarrow)
# Access active wildfire data from hosted csv
df <- readr::read_csv("https://cwfis.cfs.nrcan.gc.ca/downloads/activefires/activefires.csv")
# Data wrangling
df_geom <- df |>
dplyr::mutate(insert_datetime = Sys.time()) |>
sf::st_as_sf(coords = c("lon", "lat"), crs = 4326)
# Write active wildfires to file
basename <- paste("cdn_wildfire", as.integer(Sys.time()), sep = "_")
df_geom |>
sfarrow::write_sf_dataset("~/data/wildfires",
format = "parquet",
basename_template = paste0(basename,
"-{i}.parquet"))
}
Daily Pipeline
This pipeline is an example of a standard extract transform load (ETL) workflow. The pipeline is scheduled to run every day starting on 2024-04-25 at 06:30:00. The goal of the pipeline is to perform the following:
- submit a request to an API
- extract data from the API
- add insert datetime column
- write file to local storage in parquet format
This example has a custom function that is used to access and extract the data from the API, which is piped into additional tasks. All components of the pipeline are within the pipeline_climate_daily
function, which has no parameters.
#' pipeline_climate_daily maestro pipeline
#'
#' @maestroFrequency 1 day
#' @maestroStartTime 2024-04-25 06:30:00
#' @maestroTz America/Halifax
pipeline_climate_daily <- function() {
# load libraries
library(dplyr)
library(httr2)
library(arrow)
# Custom function for accessing api climate data
get_hourly_climate_info <- function(station_id, request_limit = 24) {
# Validate parameters
stopifnot("`station_id` must be a real number" = is.numeric(station_id) && station_id > 0)
stopifnot("`station_id` must be a length-one vector" = length(station_id) == 1)
# Access climate hourly via geomet api
hourly_req <- httr2::request("https://api.weather.gc.ca/collections/climate-hourly/items") |>
httr2::req_url_query(
lang = "en-CA",
offset = 0,
CLIMATE_IDENTIFIER = station_id,
LOCAL_DATE = Sys.Date() - 1,
limit = request_limit
)
# Perform the request
hourly_resp <- hourly_req |>
httr2::req_perform()
# Climate station response to data frame
geomet_json <- hourly_resp |>
httr2::resp_body_json(simplifyVector = TRUE)
geomet_json$features
}
# Write climate hourly to file
basename <- paste("climate_hourly", as.integer(Sys.time()), sep = "_")
get_hourly_climate_info(8202251) |>
dplyr::mutate(insert_datetime = Sys.time()) |>
arrow::write_dataset(
"~/data/climate",
format = "parquet",
basename_template = basename
)
}
These binaries (installable software) and packages are in development.
They may not be fully stable and should be used with caution. We make no claims about them.
Health stats visible at Monitor.