1. Introduction
Every organization gathers data, presenting an opportunity for meaningful actions: informed decision-making, problem-solving, and fostering business growth. Achieving these objectives necessitates the presence of reliable and high-performing data practices that enjoy trust throughout the organization.
The challenge of timely access to the right data for business analytics can be a major hurdle for companies modelled on a centralized IT distribution approach. Even the smallest of delays can have a significant impact on competitive advantage. One way around this is to decentralize the data ingestion process by using standardized patterns and automation. Imagine a scenario where the business can self-serve the procurement of their data to develop their insights within their timeframes and capabilities. This white paper describes a framework for delivering such a capability.
Customers often present highly intricate requirements, and while the available technology can meet their needs, the complexity of the required tools demands a significant level of expertise for successful implementation. StreamSets software stands out by offering a distinctive combination of graphical and programmatic approaches across the entire platform lifecycle, encompassing deployment, configuration, design, and pipeline release. This unique feature enables us to establish best practice implementations, referred to as "patterns," for common scenarios. These patterns provide companies with comprehensive guidance, including architecture, design, and code samples, thereby reducing risk and accelerating delivery. Crafted by StreamSets engineers, these patterns consider recurring scenarios encountered during customer interactions, leverage deep product knowledge, and encompass all core elements necessary for a correct initial implementation. Furthermore, they are designed to be extensible, allowing customers to tailor them to their specific needs and architectures.
1.1 StreamSets
StreamSets, a Software AG company, eliminates data integration friction in complex hybrid and multi-cloud environments to keep pace with “need-it-now” business data demands. Our platform lets data teams unlock data, in a safe and governed way, to enable a data-driven enterprise. Resilient and repeatable pipelines deliver analytics-ready data that improve real-time decision-making and reduce the costs and risks associated with data flow across an organization. That’s why the largest companies in the world trust StreamSets to power millions of data pipelines for modern analytics, data science, smart applications, and hybrid integration.
1.2 Intended use
This document is intended to be used by a central IT function (aka data platform provider) with responsibility for maintaining ingestion patterns (i.e., data ingestion types) in conjunction with business unit functions (aka data platform tenant subscribers) responsible for maintaining job templates (i.e., parameterized data ingestion jobs customized to meet business needs).
It is common for organizations to have relatively simple data movement tasks, which are used for ingestion of data from source systems into an Analytical platform, be it a data lake or data warehouse (or combination). Other, similar tasks could be the movement of data between source systems and MDM repositories, or publication of enriched data from data warehouses to applications. All these data movement tasks are simple enough to implement as one-offs, when the organization is equipped with a data ingestion platform providing drag and drop interfaces, but organizations typically have hundreds of these feeds to implement, manage, and execute. And the drag and drop user interfaces themselves, while easy to use by data engineers, are not really designed for self-service consumption by end-users; this is where StreamSets' automation comes into play, delivering the self-service experience for the Enterprise needs.
2. Framework overview
2.1 Subscription function
The subscription function is the front end of the pattern process where the business requirements for data provision are captured and validated. This can be accomplished through a straightforward Excel or spreadsheet method, enhanced as a more feature-rich website capability, or incorporated into an existing version control system.
It is the responsibility of the Data Subscription function to ensure that the metadata store is populated with the correct values for downstream scheduling (pattern selection) and downstream orchestration (parametrized job template creation).
2.2 Scheduling function
The scheduling function identifies new data ingestion requests, triggers the automation process, including ingestion pattern selection for downstream orchestration processing and if required, establishes a future runtime job schedule. All the details required to support the scheduling function are stored in the metadata store.
For a basic level of functionality, the StreamSets scheduler can be used to run a regular polling job to scan the metadata store for new requests. However, it is more likely that organizations will want to integrate the automation process into their scheduling capability to accommodate more advanced levels of functionality such as data governance compliance, processing order, exception handling, notifications, and data quality checks.
2.3 Orchestration function
Invoked by the scheduling function and using the pattern details provided, the orchestration function reads the metadata store to capture the caller request parameters (i.e., the mandatory and optional values provided during the data subscription phase) and select the parameterized job template(s) and default runtime settings. The orchestration function uses these details to populate the job templates, build the pipelines and jobs, and if required, provision the necessary runtime infrastructure for downstream job runner execution, capture the caller request parameters (i.e., the mandatory and optional values provided during the data sub-scription phase) and select the parameterized job template(s) and default runtime settings. The orchestration function uses these details to populate the job templates, build the pipelines and jobs, and if required, provision the necessary runtime infrastructure for downstream job runner execution.
2.4 Job Runner function
Using the StreamSets environment, the Job Runner function runs the automatically orchestrated pipelines and jobs to read the required data from the source, apply the required transformations, and write to the required destination. To meet operational support requirements, runtime events and statistics are written to the metadata store.
2.5 Metadata store function
The metadata store provides an organized information repository that enables scalable pattern complexity and ease of integration into already established data platforms. The following data model has been provided as an organizational example that minimizes the duplication of pattern types and paramatized job templates.
3. Proof of Concept example
This PoC provides an example of how to use the StreamSets Platform SDK to parameterize and start Job Template instances based on parameters retrieved from a database table. The source code and files to the PoC can be found here in the GitHub repository.
3.1 Behold, a guided stroll through the process!
In this example:
- A Data Analyst submits a request to run a Job to an app that makes REST API calls to the Job-Template-Service, or a scheduler like Apache Airflow uses Python bindings to directly call the Python Job Template Runner script.
- The Job Template that is run is dynamically selected based on rules applied to the request's source-type and target-type values.
- A subset of the Job's runtime parameters are passed in by the caller as part of the request, which we can consider as "dynamic" runtime parameters, and additional pipeline and connection parameters are retrieved from the configuration store, which we can consider as "static" runtime parameters.
- A Python application built using the StreamSets SDK selects the appropriate Job Template and retrieves the Job Template configuration and static parameters from a set of database tables.
- The Python Application creates and starts Job Template Instance(s) that StreamSets Control Hub schedules on engines.
- The Python Application spawns a new thread per Job Template Instance, and each thread waits until its instance completes, then gathers the instance metrics, and inserts the metrics into a database table.
3.2 Prerequisites
In this example, the following components are required:
- A PostgreSQL database
- Python 3.8+
- Psycopg - PostgreSQL database adapter for Python
- Flask - Python web application framework
- StreamSets Platform SDK for Python v6.0.1+
- StreamSets Platform API Credentials for a user with permissions to start Jobs
The following is an example Cloud environment used to develop a pattern framework.
3.3 The nuts and bolts unveiling - Implementation details
In this example:
- The caller’s input requirements are posted as a JSON payload to a REST API service provided by Flask, a lightweight web framework for building applications in Python. Runtime parameters are captured as dynamic values passed by the user (i.e., HTTP_URL, “GCS_Bucket”) and as static values pre-defined and referenced from the metadata store (i.e., “HTTP_ Method”, “HTTP_Mode”, “GCS_Connection”).
- The REST API endpoint calls a “run_job_template” method in the Python file “job_template_runner.py”. The type of job template selected is pre-defined in the metadata store using the caller provided source and target values.
- Interaction with the StreamSets Platform is managed by the class “StreamSetsManager” in the Python file “streamsets_ manager.py”.
- Interaction with the database is managed by the class “DatabaseManager” in the Python file “database_manager.py”.
3.4 Configuration and runtime details
Please reference the GitHub repository for details: https://github.com/streamsets/data-integration-patterns
4. Different approaches to implementation
4.1 Basic pattern
A pattern implementation in its simplest form (i.e., with minimal IT platform integration requirements) is achieved using a REST API data subscription front end where the callers’ requirements are submitted at the command line level. This does place an emphasis on the caller submitting a correctly formatted input but removes the more time and resource consuming requirement to develop a new or integrate with an existing front-end system (i.e., web portal, GitLab service, etc.).
Post data ingestion request, the caller inquires upon the metadata store to determine job status and outcome.
4.2 Optimized pattern
With an automated pattern capability established, further step improvements can be introduced to simplify the customer experience and expand the scope of ingestion scenarios, such as:
- The addition of a drop-down selection menu for patterns and parameters (i.e., subscribing to a webforms provider or capability that takes the callers input using drop down menu selection and submits their request in a REST API format).
- Reducing the number of caller parameter selection requirements by aligning predetermined settings to department level defaults.
- Expanding the metadata store with a wider range of caller request templates.
4.3 Advanced pattern
With the addition of data lineage and chatbot technology, it is not inconceivable that data analysts could reach their data ingestion outcomes using free form data inquires and metadata store guided intelligence.
Other considerations could include:
- Improved logging, auditing, monitoring, and error handling
- Improved security and access control
- Improved scalability and maintenance support
The application of this pattern is suitable for various scenarios, including migration to the cloud, data ingestion into a data lake, and data ingestion into a staging area in a data warehouse. Attempting these tasks without the proper conditions poses challenges, such as dealing with numerous feeds, extensive manual efforts, and the risk of losing control and visibility over the process.
5. Customer case: A large energy company
5.1 Overview and challenges
To expedite insights for the analyst community and minimize IT costs, A large energy tech pioneer sought a platform capable of automating data collection to a public cloud for processing. The challenge was to make this data accessible to diverse applications serving groups with distinct analytic needs without duplicating data storage. Due to the extensive volume and diversity of data, a one-size-fits-all approach to data management and processing proved impractical.
The system handled a substantial daily data load, reaching billions of records from systems spanning Billing, Customer Identity, Payments, Industry data, Metering, Pricing/Markets, Forecasting, and Regulatory categories. The data originated from various technologies like Oracle, Postgres, SQL Server, S3, flat files, and APIs. Simultaneously, there was an ongoing effort to migrate data and workloads from the existing SQL Server infrastructure to enhance efficiency and accommodate evolving requirements.
The objective was to establish a secure, multitenant architecture facilitating self-service data access and the ability to transform data into formats required by data scientists.
5.2 Solution
StreamSets facilitated the ingestion of data into cloud data warehouses, specifically Snowflake and Databricks, as well as the egress of data from the cloud to SQL Server and other existing legacy systems. The implementation of StreamSets involves the utilization of 'pipelines as code,' enabling streamlined operations with minimal maintenance requirements. The platform supports a diverse array of use cases and can accommodate a substantial workload, handling up to 1000 jobs per day, with the potential for growth, particularly during peak periods like month-end processes.
5.3 Result
The implementation of StreamSets has yielded significant outcomes, ensuring a dependable daily provisioning of data to over 400 users, addressing their analytic needs effectively. This has resulted in a notable reduction in the time required for the analyst community to deliver new assets, simultaneously lowering the entry bar for technical proficiency among analysts. The platform's efficiency has further led to a decrease in pressure and expenses for central IT teams, particularly in relation to data pipeline builds and fixes. With 500 jobs executed daily, StreamSets demonstrates robust performance, ingesting data from 100 different databases, APIs, and Filesystems, including the ingestion of billions of records from large tables.
The evaluation criteria, encompassing the speed of provisioning necessary components in the cloud, ease of development, customer support, and the ability to influence product development, showcase StreamSets as a comprehensive solution that not only meets operational demands but also provides a user-friendly experience with strategic impact.
6. Metadata store data model
6.1 Ingestion Pattern
The Ingestion Pattern table holds the key details for a given ingestion pattern and provides the base definition of the pattern.
6.2 Job Template
The Job Template table holds the information on the job definition and the associated parameters and is used to create a job instance that will perform the data ingestion.
6.3 Ingestion Pattern & Job Template Relationship
The Ingestion Pattern and Job Template Relationship table holds the relationship between ingestion patterns and the associated job templates.
6.4 Job Instance
The Job Instance table holds the information on a particular job instance and other logging information on the job runs.