Pushing to the Timeseries Writer (BDP)
This page covers the write side of a transformer that produces time series measurements, using the go-bdp-client SDK. For the shared transformer skeleton (the SDK listener, environment variables, containerization, and the local development workflow), see Developing a Data Transformer from Scratch.
1. Authentication for the Timeseries Writer
The Timeseries Writer API uses OAuth2 for authentication. Transformers obtain an access token (via Keycloak, "Client Credentials" flow) to make authenticated requests.
We provide shared OAuth client credentials for development and testing purposes only:
- Token Endpoint:
https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token - Client ID:
odh-mobility-datacollector-development - Client Secret:
7bd46f8f-c296-416d-a13d-dc81e68d0830
The odh-mobility-datacollector-development client is authorized to write data to the Timeseries Writer.
These credentials are strictly for testing and development. For production deployments, request dedicated OAuth client credentials from the Open Data Hub team.
- Using the Go SDK
- Self-managed
When using the go-bdp-client SDK, authentication is largely transparent. You only configure the relevant environment variables:
BDP_TOKEN_URL: The URL of the OAuth2 token endpoint.BDP_CLIENT_ID: The client ID for your transformer.BDP_CLIENT_SECRET: The client secret for your transformer.
These BDP_* names apply to go-bdp-client v1.4 and later, which the boilerplate uses. Collectors or transformers pinned to go-bdp-client v1.3 or earlier read the equivalent ODH_TOKEN_URL, ODH_CLIENT_ID, and ODH_CLIENT_SECRET instead.
The SDK (specifically bdplib.FromEnv()) automatically fetches an access token, refreshes it before it expires, and sets the Authorization: Bearer header on every request to the Timeseries Writer.
For debugging, you can acquire a token manually:
curl -X POST \
"https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token" \
-H "Content-Type: application/x-www-form-urlencoded" \
-d "grant_type=client_credentials&client_id=odh-mobility-datacollector-development&client_secret=7bd46f8f-c296-416d-a13d-dc81e68d0830"
Then include the returned access_token in the Authorization header, for example against /json/stations:
curl -X GET "http://localhost:8999/json/stations" \
--header 'Content-Type: application/json' \
--header 'Authorization: bearer YOUR_ACCESS_TOKEN'
2. Initializing the BDP client
A time series transformer initializes a bdplib client and synchronizes its data types on startup, then plugs the client into the shared listener:
func main() {
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data transformer...")
b := bdplib.FromEnv() // initialize the BDP client from BDP_* env vars
defer tel.FlushOnPanic()
// Register the data types this transformer will publish (once, on startup).
dataTypeList := bdplib.NewDataTypeList(nil)
err := dataTypeList.Load("datatypes.json")
ms.FailOnError(context.Background(), err, "could not load datatypes")
b.SyncDataTypes(dataTypeList.All())
// The BDP client is injected into the handler via a small wrapper.
listener := tr.NewTr[Forecast](context.Background(), env)
err = listener.Start(context.Background(), TransformWithBdp(b))
ms.FailOnError(context.Background(), err, "error while listening to queue")
}
// TransformWithBdp adapts the Transform logic to the SDK's tr.Handler signature,
// injecting the BDP client.
func TransformWithBdp(bdp bdplib.Bdp) tr.Handler[Forecast] {
return func(ctx context.Context, payload *rdb.Raw[Forecast]) error {
return Transform(ctx, bdp, payload)
}
}
bdplib.FromEnv()reads theBDP_environment variables to configure the base URL, provenance details, and OAuth2 authentication.b.SyncDataTypes(...)registers all data types this transformer can publish, so the target understands the metrics. It is typically called once on startup.TransformWithBdpis an adapter:tr.Handlerexpectsfunc(ctx, *rdb.Raw[T]) error, so the wrapper injects thebdplib.Bdpclient into yourTransform.
3. The Transform function
This is the core business logic: map the raw data into stations and measurements, then push them.
Each transformer implements its own logic; the example below (off-street parking) is illustrative.
- Go Code (main.go) - Transform Function
const (
stationTypeParent = "ParkingFacility" // Parent station type
stationType = "ParkingStation" // Child station type
dataTypeFreeTotal = "free"
dataTypeOccupiedTotal = "occupied"
)
// Transform converts raw parking data into BDP stations and measurements.
func Transform(ctx context.Context, bdp bdplib.Bdp, payload *rdb.Raw[FacilityData]) error {
log := logger.Get(ctx)
var parentStations []bdplib.Station
stations := make(map[string]bdplib.Station) // child stations by ID
dataMapParent := bdp.CreateDataMap()
dataMap := bdp.CreateDataMap()
ts := payload.Timestamp.UnixMilli()
for _, facility := range payload.Rawdata {
// 1. Parent station + metadata
id := facility.GetID()
parent_station_data := station_proto.GetStationByID(strconv.Itoa(id))
if parent_station_data == nil {
log.Error("no parent station data", "facility_id", strconv.Itoa(id))
panic("no parent station data")
}
parentStation := bdplib.CreateStation(
parent_station_data.ID, parent_station_data.Name, stationTypeParent,
parent_station_data.Lat, parent_station_data.Lon, bdp.GetOrigin())
parentStation.MetaData = parent_station_data.ToMetadata()
parentStations = append(parentStations, parentStation)
// 2. Child stations + measurements
for _, freePlace := range facility.FacilityDetails {
facility_id := strconv.Itoa(facility.GetID()) + "_" + strconv.Itoa(freePlace.ParkNo)
station_data := station_proto.GetStationByID(facility_id)
if station_data == nil {
log.Error("no station data", "facility_id", facility_id)
panic("no station data")
}
station, ok := stations[facility_id]
if !ok {
station = bdplib.CreateStation(
station_data.ID, station_data.Name, stationType,
station_data.Lat, station_data.Lon, bdp.GetOrigin())
station.ParentStation = parentStation.Id // link child to parent
station.MetaData = station_data.ToMetadata()
stations[station_data.ID] = station
}
dataMap.AddRecord(station_data.ID, dataTypeFreeTotal, bdplib.CreateRecord(ts, freePlace.FreePlaces, 600))
dataMap.AddRecord(station_data.ID, dataTypeOccupiedTotal, bdplib.CreateRecord(ts, freePlace.CurrentLevel, 600))
}
}
// 3. Batch sync stations, then push measurements
bdp.SyncStations(stationTypeParent, parentStations, true, true)
bdp.SyncStations(stationType, values(stations), true, true)
bdp.PushData(stationTypeParent, dataMapParent)
bdp.PushData(stationType, dataMap)
return nil
}
Critical points and patterns:
- Hierarchical stations: a parent station (
ParkingFacility) groups child stations (ParkingStation);station.ParentStation = parentStation.Idestablishes the relationship. bdplib.CreateStation(id, name, type, lat, lon, origin)builds a station; assignMetaDatato enrich it.bdplib.CreateRecord(ts, value, period)creates a measurement record (tsis Unix milliseconds;periodis the aggregation interval in seconds).bdp.SyncStations(type, stations, syncState, onlyActivate):syncState=trueupdates active/inactive state based on presence in the call;onlyActivate=trueonly activates existing stations (usefalseif new stations can appear dynamically).bdp.PushData(type, dataMap)pushes all collected measurements for a station type in one batch call.
The dto.go file defines the Go structs for the raw input schema (for example FacilityData) — this is the type passed to tr.NewTr[FacilityData].
4. Deployment with Helm
The Helm values configure the image and the environment, with the OAuth credentials loaded from Kubernetes secrets.
- Helm Chart Values (Conceptual)
image:
repository: ghcr.io/noi-techpark/opendatahub-transformers/tr-parking-offstreet-skidata
pullPolicy: IfNotPresent
tag: "0.0.1"
env:
LOG_LEVEL: "INFO"
MQ_QUEUE: s3-poller.parking-offstreet-skidata
MQ_EXCHANGE: routed
MQ_KEY: s3-poller.parking-offstreet-skidata
MQ_CLIENT: tr-parking-offstreet-skidata
RAW_DATA_BRIDGE_ENDPOINT: "http://raw-data-bridge-service.default.svc.cluster.local:2000/"
BDP_BASE_URL: https://share.opendatahub.testingmachine.eu
BDP_PROVENANCE_VERSION: 0.1.0
BDP_PROVENANCE_NAME: tr-parking-offstreet-skidata
BDP_ORIGIN: province-bolzano
SERVICE_NAME: tr-parking-offstreet-skidata
TELEMETRY_TRACE_GRPC_ENDPOINT: tempo-distributor-discovery.monitoring.svc.cluster.local:4317
envSecretRef:
- name: MQ_URI
secret: rabbitmq-svcbind
key: uri
- name: BDP_TOKEN_URL
secret: odh-oauth-client-credentials
key: token_url
- name: BDP_CLIENT_ID
secret: odh-oauth-client-credentials
key: client_id
- name: BDP_CLIENT_SECRET
secret: odh-oauth-client-credentials
key: client_secret
For production, BDP_CLIENT_ID, BDP_CLIENT_SECRET, and BDP_TOKEN_URL should always be loaded from Kubernetes secrets via envSecretRef, never committed to Git.
5. Testing with bdpmock
go-bdp-client ships bdpmock, which fully mocks the bdplib.Bdp interface so you can test the Transform function without a live writer. The pattern feeds a golden input file, runs Transform against the mock, and compares the recorded calls with an expected-output file:
func TestMyBestParking(t *testing.T) {
var in = FacilityData{}
station_proto = ReadStations("../resources/stations.csv")
bdpmock.LoadInputData(&in, "../testdata/input/mybestparking.json")
timestamp, _ := time.Parse("2006-01-02", "2025-01-01")
raw := rdb.Raw[FacilityData]{Rawdata: in, Timestamp: timestamp}
var out = bdpmock.BdpMockCalls{}
bdpmock.LoadOutput(&out, "../testdata/output/mybestparking--out.json")
b := bdpmock.MockFromEnv() // mock instead of bdplib.FromEnv()
err := Transform(context.TODO(), b, &raw)
require.Nil(t, err)
req := b.(*bdpmock.BdpMock).Requests()
NormalizeBdpMockCalls(&req) // sort slices for order-independent comparison
testsuite.DeepEqualFromFile(t, out, req)
}
bdpmock.MockFromEnv()returns a mockbdplib.Bdpthat records everySyncStations,SyncDataTypes, andPushDatacall instead of sending it.- Input/output golden files (
testdata/input/*.json,testdata/output/*.json) make the test deterministic: the output file is the exact set ofbdplibcalls the transformer is expected to make. NormalizeBdpMockCallssorts the recorded slices so thattestsuite.DeepEqualFromFilecomparisons are order-independent.
Related
- Developing a Data Transformer from Scratch (the shared skeleton)
- Pushing to the Content API
- SDKs