opensource.google.com

Menu

Producer java library for Data Lineage is now open source

Tuesday, January 28, 2025

Integrating OpenLineage producers with GCP Lineage just got a lot easier


What is Data Lineage

Data Lineage is a GCP feature that allows tracking data movement. This tool helps data owners and analysts detect anomalies in data flows, find connections between data sources and verify the potential consequences of planned changes in data pipelines.

Lineage is injected automatically for some Google Cloud products (BigQuery, Cloud Data Fusion, Cloud Composer, Dataproc, Vertex AI). That means, if Lineage integration with any of those products is enabled in the projects, data movements coming from executing jobs by these products will be reported to GCP Lineage.

For custom integrations, the API can be used to report and fetch lineage.

After injecting, lineage can be viewed in the Google Cloud console (available from DataCatalog UI, BigQuery UI, Vertex UI). There are two representations: graph view, with data sources as nodes and data movements as edges, and list view, a tabular representation. Lineage information can also be fetched from the API.

More information is available in the documentation.


GCP Lineage information model

We describe data flows using the following concepts:

  • Process is a definition of some data transformation. For example, a SQL or Spark script.
  • Run is an execution of a Process.
  • Lineage Event is a data transformation event. It is reported in context of a Run.
  • A Link represents a connection between two data sources, when data in the link’s Target depends on its Source. A Lineage Event contains a list of Links.

OpenLineage support

OpenLineage is an open standard for reporting lineage information. It unifies lineage reporting between systems, which means the events generated in this format can be consumed by any product supporting it. This leads to more flexibility: adding or replacing a lineage producer does not imply changing the consumer, and vice versa.

OpenLineage format is adopted by a number of lineage producers and consumers, meaning there is already tooling available to report lineage from/to those systems. GCP Lineage is one of those consumers: users can report events in OpenLineage format, see the resulting lineage on the UI, and query it via the API.

OpenLineage is the preferred method for reporting lineage in GCP Lineage. It is used by the Dataproc lineage integration. To find out more about sending OpenLineage events to GCP Lineage refer to the documentation.

After injecting lineage in OpenLineage format, it can be accessed in the same way as if it was injected via other API methods or automatically: from the Google Cloud console or the API.


Why producer library

The GCP Lineage producer library is an extension of the client library. Client libraries are recommended for calling Cloud APIs programmatically. They handle low level API call details, leaving the necessary user code simpler and shorter.

The producer library further simplifies integration by providing ready to use code needed to call the API from Java. It adds additional functionality such as synchronous and asynchronous clients, translating OpenLineage JSON messages to the API friendly format, error handling etc.

Using the producer library, all the code needed to send a request to GCP Lineage API is:

SyncLineageProducerClient client = SyncLineageProducerClient.create();
ProcessOpenLineageRunEventRequest request =
        ProcessOpenLineageRunEventRequest.newBuilder()
            .setParent(parent)
            .setOpenLineage(openLineageMessage)
            .build();
client.processOpenLineageRunEvent(request);

The field openLineageMessage here is a protobuf Struct that includes information about job execution, inputs and outputs and other metadata. The object model is described in the documentation. An example message is:

{
  "eventType": "START",
  "eventTime": "2023-04-04T13:21:16.098Z",
  "run": {
    "runId": "502483d6-3e3d-474f-9380-da565eaa7516",
    "facets": {
       "spark_properties": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/spark",
        "_schemaURL": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet",
        "properties": {
          "spark.master": "yarn",
          "spark.app.name": "sparkJobTest.py"
        }
      }
    }
  },
  "job": {
    "namespace": "project-name",
    "name": "cluster-name",
    "facets": {
    "jobType": {
        "_producer": "https://github.com/OpenLineage/OpenLineage/tree/1.22.0/integration/spark",
        "_schemaURL": "https://openlineage.io/spec/facets/2-0-3/JobTypeJobFacet.json#/$defs/JobTypeJobFacet",
        "processingType": "BATCH",
        "integration": "SPARK",
        "jobType": "SQL_JOB"
      },

    }
  },
  "inputs": [
    {
      "namespace": "bigquery",
      "name": "project.dataset.input_table",
    }],
  "outputs": [
   {
      "namespace": "bigquery",
      "name": "project.dataset.output_table",
    }],
  "producer": "https://github.com/OpenLineage/OpenLineage/tree/0.18.0/integration/spark",
  "schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/$defs/RunEvent"
}

Learn more about building an OpenLineage message.


Best Practices for Constructing OpenLineage Messages

The openLineageMessage should follow the OpenLineage format. The fields that are required for correct parsing by the GCP Lineage API are:

job

mapped to Process

job.namespace

used to construct Process name

job.name

used to construct Process name

run

mapped to Run

run.runId

used to construct Run name

producer

URI identifying the producer of this metadata

eventTime

time of the data movement

schemaURL

URL pointing to the schema definition for this message

In addition to those, the fields used to create lineage are:

eventType

corresponds to the status of the Run

inputs

mapped to sources of links. Must be specified according to the naming conventions

outputs

mapped to targets of links. Must be specified according to the naming conventions

The GCP Lineage API supports OpenLineage major versions 1 and 2. For more information please refer to the documentation.


How to access GCP Lineage?

The code is now publicly available on GitHub. The library is also published to Maven.


GcpLineageTransport

To simplify integration with GCP Lineage, we offer GcpLineageTransport. It is available on the OpenLineage GitHub repository and is built to a separate maven artifact. It is built on top of the producer library mentioned above.

Using the transport minimises the code for sending events to GCP Lineage. The GcpLineageTransport can be configured as the event sink for any existing OpenLineage producer such as Airflow, Spark, and Flink. Find more information and examples on GCP Lineage.

By Mary Idamkina – Data Lineage

.