Skip to main content

Pushing to the Timeseries Writer (BDP)

This page covers the write side of a transformer that produces time series measurements, using the go-bdp-client SDK. For the shared transformer skeleton (the SDK listener, environment variables, containerization, and the local development workflow), see Developing a Data Transformer from Scratch.

1. Authentication for the Timeseries Writer

The Timeseries Writer API uses OAuth2 for authentication. Transformers obtain an access token (via Keycloak, "Client Credentials" flow) to make authenticated requests.

We provide shared OAuth client credentials for development and testing purposes only:

  • Token Endpoint: https://auth.opendatahub.testingmachine.eu/auth/realms/noi/protocol/openid-connect/token
  • Client ID: odh-mobility-datacollector-development
  • Client Secret: 7bd46f8f-c296-416d-a13d-dc81e68d0830

The odh-mobility-datacollector-development client is authorized to write data to the Timeseries Writer.

Important

These credentials are strictly for testing and development. For production deployments, request dedicated OAuth client credentials from the Open Data Hub team.

When using the go-bdp-client SDK, authentication is largely transparent. You only configure the relevant environment variables:

  • BDP_TOKEN_URL: The URL of the OAuth2 token endpoint.
  • BDP_CLIENT_ID: The client ID for your transformer.
  • BDP_CLIENT_SECRET: The client secret for your transformer.
info

These BDP_* names apply to go-bdp-client v1.4 and later, which the boilerplate uses. Collectors or transformers pinned to go-bdp-client v1.3 or earlier read the equivalent ODH_TOKEN_URL, ODH_CLIENT_ID, and ODH_CLIENT_SECRET instead.

The SDK (specifically bdplib.FromEnv()) automatically fetches an access token, refreshes it before it expires, and sets the Authorization: Bearer header on every request to the Timeseries Writer.

2. Initializing the BDP client

A time series transformer initializes a bdplib client and synchronizes its data types on startup, then plugs the client into the shared listener:

func main() {
ms.InitWithEnv(context.Background(), "", &env)
slog.Info("Starting data transformer...")
b := bdplib.FromEnv() // initialize the BDP client from BDP_* env vars

defer tel.FlushOnPanic()

// Register the data types this transformer will publish (once, on startup).
dataTypeList := bdplib.NewDataTypeList(nil)
err := dataTypeList.Load("datatypes.json")
ms.FailOnError(context.Background(), err, "could not load datatypes")
b.SyncDataTypes(dataTypeList.All())

// The BDP client is injected into the handler via a small wrapper.
listener := tr.NewTr[Forecast](context.Background(), env)
err = listener.Start(context.Background(), TransformWithBdp(b))
ms.FailOnError(context.Background(), err, "error while listening to queue")
}

// TransformWithBdp adapts the Transform logic to the SDK's tr.Handler signature,
// injecting the BDP client.
func TransformWithBdp(bdp bdplib.Bdp) tr.Handler[Forecast] {
return func(ctx context.Context, payload *rdb.Raw[Forecast]) error {
return Transform(ctx, bdp, payload)
}
}
  • bdplib.FromEnv() reads the BDP_ environment variables to configure the base URL, provenance details, and OAuth2 authentication.
  • b.SyncDataTypes(...) registers all data types this transformer can publish, so the target understands the metrics. It is typically called once on startup.
  • TransformWithBdp is an adapter: tr.Handler expects func(ctx, *rdb.Raw[T]) error, so the wrapper injects the bdplib.Bdp client into your Transform.

3. The Transform function

This is the core business logic: map the raw data into stations and measurements, then push them.

info

Each transformer implements its own logic; the example below (off-street parking) is illustrative.

const (
stationTypeParent = "ParkingFacility" // Parent station type
stationType = "ParkingStation" // Child station type

dataTypeFreeTotal = "free"
dataTypeOccupiedTotal = "occupied"
)

// Transform converts raw parking data into BDP stations and measurements.
func Transform(ctx context.Context, bdp bdplib.Bdp, payload *rdb.Raw[FacilityData]) error {
log := logger.Get(ctx)

var parentStations []bdplib.Station
stations := make(map[string]bdplib.Station) // child stations by ID

dataMapParent := bdp.CreateDataMap()
dataMap := bdp.CreateDataMap()

ts := payload.Timestamp.UnixMilli()

for _, facility := range payload.Rawdata {
// 1. Parent station + metadata
id := facility.GetID()
parent_station_data := station_proto.GetStationByID(strconv.Itoa(id))
if parent_station_data == nil {
log.Error("no parent station data", "facility_id", strconv.Itoa(id))
panic("no parent station data")
}
parentStation := bdplib.CreateStation(
parent_station_data.ID, parent_station_data.Name, stationTypeParent,
parent_station_data.Lat, parent_station_data.Lon, bdp.GetOrigin())
parentStation.MetaData = parent_station_data.ToMetadata()
parentStations = append(parentStations, parentStation)

// 2. Child stations + measurements
for _, freePlace := range facility.FacilityDetails {
facility_id := strconv.Itoa(facility.GetID()) + "_" + strconv.Itoa(freePlace.ParkNo)
station_data := station_proto.GetStationByID(facility_id)
if station_data == nil {
log.Error("no station data", "facility_id", facility_id)
panic("no station data")
}
station, ok := stations[facility_id]
if !ok {
station = bdplib.CreateStation(
station_data.ID, station_data.Name, stationType,
station_data.Lat, station_data.Lon, bdp.GetOrigin())
station.ParentStation = parentStation.Id // link child to parent
station.MetaData = station_data.ToMetadata()
stations[station_data.ID] = station
}
dataMap.AddRecord(station_data.ID, dataTypeFreeTotal, bdplib.CreateRecord(ts, freePlace.FreePlaces, 600))
dataMap.AddRecord(station_data.ID, dataTypeOccupiedTotal, bdplib.CreateRecord(ts, freePlace.CurrentLevel, 600))
}
}

// 3. Batch sync stations, then push measurements
bdp.SyncStations(stationTypeParent, parentStations, true, true)
bdp.SyncStations(stationType, values(stations), true, true)
bdp.PushData(stationTypeParent, dataMapParent)
bdp.PushData(stationType, dataMap)
return nil
}

Critical points and patterns:

  • Hierarchical stations: a parent station (ParkingFacility) groups child stations (ParkingStation); station.ParentStation = parentStation.Id establishes the relationship.
  • bdplib.CreateStation(id, name, type, lat, lon, origin) builds a station; assign MetaData to enrich it.
  • bdplib.CreateRecord(ts, value, period) creates a measurement record (ts is Unix milliseconds; period is the aggregation interval in seconds).
  • bdp.SyncStations(type, stations, syncState, onlyActivate): syncState=true updates active/inactive state based on presence in the call; onlyActivate=true only activates existing stations (use false if new stations can appear dynamically).
  • bdp.PushData(type, dataMap) pushes all collected measurements for a station type in one batch call.

The dto.go file defines the Go structs for the raw input schema (for example FacilityData) — this is the type passed to tr.NewTr[FacilityData].

4. Deployment with Helm

The Helm values configure the image and the environment, with the OAuth credentials loaded from Kubernetes secrets.

image:
repository: ghcr.io/noi-techpark/opendatahub-transformers/tr-parking-offstreet-skidata
pullPolicy: IfNotPresent
tag: "0.0.1"

env:
LOG_LEVEL: "INFO"
MQ_QUEUE: s3-poller.parking-offstreet-skidata
MQ_EXCHANGE: routed
MQ_KEY: s3-poller.parking-offstreet-skidata
MQ_CLIENT: tr-parking-offstreet-skidata
RAW_DATA_BRIDGE_ENDPOINT: "http://raw-data-bridge-service.default.svc.cluster.local:2000/"

BDP_BASE_URL: https://share.opendatahub.testingmachine.eu
BDP_PROVENANCE_VERSION: 0.1.0
BDP_PROVENANCE_NAME: tr-parking-offstreet-skidata
BDP_ORIGIN: province-bolzano

SERVICE_NAME: tr-parking-offstreet-skidata
TELEMETRY_TRACE_GRPC_ENDPOINT: tempo-distributor-discovery.monitoring.svc.cluster.local:4317

envSecretRef:
- name: MQ_URI
secret: rabbitmq-svcbind
key: uri
- name: BDP_TOKEN_URL
secret: odh-oauth-client-credentials
key: token_url
- name: BDP_CLIENT_ID
secret: odh-oauth-client-credentials
key: client_id
- name: BDP_CLIENT_SECRET
secret: odh-oauth-client-credentials
key: client_secret

For production, BDP_CLIENT_ID, BDP_CLIENT_SECRET, and BDP_TOKEN_URL should always be loaded from Kubernetes secrets via envSecretRef, never committed to Git.

5. Testing with bdpmock

go-bdp-client ships bdpmock, which fully mocks the bdplib.Bdp interface so you can test the Transform function without a live writer. The pattern feeds a golden input file, runs Transform against the mock, and compares the recorded calls with an expected-output file:

func TestMyBestParking(t *testing.T) {
var in = FacilityData{}
station_proto = ReadStations("../resources/stations.csv")
bdpmock.LoadInputData(&in, "../testdata/input/mybestparking.json")

timestamp, _ := time.Parse("2006-01-02", "2025-01-01")
raw := rdb.Raw[FacilityData]{Rawdata: in, Timestamp: timestamp}

var out = bdpmock.BdpMockCalls{}
bdpmock.LoadOutput(&out, "../testdata/output/mybestparking--out.json")

b := bdpmock.MockFromEnv() // mock instead of bdplib.FromEnv()
err := Transform(context.TODO(), b, &raw)
require.Nil(t, err)

req := b.(*bdpmock.BdpMock).Requests()
NormalizeBdpMockCalls(&req) // sort slices for order-independent comparison
testsuite.DeepEqualFromFile(t, out, req)
}
  • bdpmock.MockFromEnv() returns a mock bdplib.Bdp that records every SyncStations, SyncDataTypes, and PushData call instead of sending it.
  • Input/output golden files (testdata/input/*.json, testdata/output/*.json) make the test deterministic: the output file is the exact set of bdplib calls the transformer is expected to make.
  • NormalizeBdpMockCalls sorts the recorded slices so that testsuite.DeepEqualFromFile comparisons are order-independent.