Manage Pipelines
In GreptimeDB, each pipeline
is a collection of data processing units used for parsing and transforming the ingested log content. This document provides guidance on creating and deleting pipelines to efficiently manage the processing flow of log data.
For specific pipeline configurations, please refer to the Pipeline Configuration documentation.
Built-in Pipelines
GreptimeDB offers built-in pipelines for common log formats, allowing you to use them directly without creating new pipelines.
Note that the built-in pipelines are not editable. Additionally, the "greptime_" prefix of the pipeline name is reserved.
greptime_identity
The greptime_identity
pipeline is designed for writing JSON logs and automatically creates columns for each field in the JSON log.
- The first-level keys in the JSON log are used as column names.
- An error is returned if the same field has different types.
- Fields with
null
values are ignored. - An additional column,
greptime_timestamp
, is added to the table as the time index to indicate when the log was written.
Type conversion rules
string
->string
number
->int64
orfloat64
boolean
->bool
null
-> ignorearray
->json
object
->json
For example, if we have the following json data:
[
{"name": "Alice", "age": 20, "is_student": true, "score": 90.5,"object": {"a":1,"b":2}},
{"age": 21, "is_student": false, "score": 85.5, "company": "A" ,"whatever": null},
{"name": "Charlie", "age": 22, "is_student": true, "score": 95.5,"array":[1,2,3]}
]
We'll merge the schema for each row of this batch to get the final schema. The table schema will be:
mysql> desc pipeline_logs;
+--------------------+---------------------+------+------+---------+---------------+
| Column | Type | Key | Null | Default | Semantic Type |
+--------------------+---------------------+------+------+---------+---------------+
| age | Int64 | | YES | | FIELD |
| is_student | Boolean | | YES | | FIELD |
| name | String | | YES | | FIELD |
| object | Json | | YES | | FIELD |
| score | Float64 | | YES | | FIELD |
| company | String | | YES | | FIELD |
| array | Json | | YES | | FIELD |
| greptime_timestamp | TimestampNanosecond | PRI | NO | | TIMESTAMP |
+--------------------+---------------------+------+------+---------+---------------+
8 rows in set (0.00 sec)
The data will be stored in the table as follows:
mysql> select * from pipeline_logs;
+------+------------+---------+---------------+-------+---------+---------+----------------------------+
| age | is_student | name | object | score | company | array | greptime_timestamp |
+------+------------+---------+---------------+-------+---------+---------+----------------------------+
| 22 | 1 | Charlie | NULL | 95.5 | NULL | [1,2,3] | 2024-10-18 09:35:48.333020 |
| 21 | 0 | NULL | NULL | 85.5 | A | NULL | 2024-10-18 09:35:48.333020 |
| 20 | 1 | Alice | {"a":1,"b":2} | 90.5 | NULL | NULL | 2024-10-18 09:35:48.333020 |
+------+------------+---------+---------------+-------+---------+---------+----------------------------+
3 rows in set (0.01 sec)
Create a Pipeline
GreptimeDB provides a dedicated HTTP interface for creating pipelines.
Assuming you have prepared a pipeline configuration file pipeline.yaml
, use the following command to upload the configuration file, where test
is the name you specify for the pipeline:
## Upload the pipeline file. 'test' is the name of the pipeline
curl -X "POST" "http://localhost:4000/v1/events/pipelines/test" -F "file=@pipeline.yaml"
Delete a Pipeline
You can use the following HTTP interface to delete a pipeline:
## 'test' is the name of the pipeline
curl -X "DELETE" "http://localhost:4000/v1/events/pipelines/test?version=2024-06-27%2012%3A02%3A34.257312110Z"
In the above example, we deleted a pipeline named test
. The version
parameter is required to specify the version of the pipeline to be deleted.
Query Pipelines
Currently, you can use SQL to query pipeline information.
SELECT * FROM greptime_private.pipelines;
Please note that if you are using the MySQL or PostgreSQL protocol to connect to GreptimeDB, the precision of the pipeline time information may vary, and nanosecond-level precision may be lost.
To address this issue, you can cast the created_at
field to a timestamp to view the pipeline's creation time. For example, the following query displays created_at
in bigint
format:
SELECT name, pipeline, created_at::bigint FROM greptime_private.pipelines;
The query result is as follows:
name | pipeline | greptime_private.pipelines.created_at
------+-----------------------------------+---------------------------------------
test | processors: +| 1719489754257312110
| - date: +|
| field: time +|
| formats: +|
| - "%Y-%m-%d %H:%M:%S%.3f"+|
| ignore_missing: true +|
| +|
| transform: +|
| - fields: +|
| - id1 +|
| - id2 +|
| type: int32 +|
| - fields: +|
| - type +|
| - logger +|
| type: string +|
| index: tag +|
| - fields: +|
| - log +|
| type: string +|
| index: fulltext +|
| - field: time +|
| type: time +|
| index: timestamp +|
| |
(1 row)
Then, you can use a program to convert the bigint type timestamp from the SQL result into a time string.
timestamp_ns="1719489754257312110"; readable_timestamp=$(TZ=UTC date -d @$((${timestamp_ns:0:10}+0)) +"%Y-%m-%d %H:%M:%S").${timestamp_ns:10}Z; echo "Readable timestamp (UTC): $readable_timestamp"
Output:
Readable timestamp (UTC): 2024-06-27 12:02:34.257312110Z
The output Readable timestamp (UTC)
represents the creation time of the pipeline and also serves as the version number.
Debug
First, please refer to the Quick Start example to see the correct execution of the Pipeline.
Debug creating a Pipeline
You may encounter errors when creating a Pipeline. For example, when creating a Pipeline using the following configuration:
curl -X "POST" "http://localhost:4000/v1/events/pipelines/test" \
-H 'Content-Type: application/x-yaml' \
-d $'processors:
- date:
field: time
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
ignore_missing: true
- gsub:
fields:
- message
pattern: "\\\."
replacement:
- "-"
ignore_missing: true
transform:
- fields:
- message
type: string
- field: time
type: time
index: timestamp'
The pipeline configuration contains an error. The gsub
Processor expects the replacement
field to be a string, but the current configuration provides an array. As a result, the pipeline creation fails with the following error message:
{"error":"Failed to parse pipeline: 'replacement' must be a string"}
Therefore, We need to modify the configuration of the gsub
Processor and change the value of the replacement
field to a string type.
curl -X "POST" "http://localhost:4000/v1/events/pipelines/test" \
-H 'Content-Type: application/x-yaml' \
-d $'processors:
- date:
field: time
formats:
- "%Y-%m-%d %H:%M:%S%.3f"
ignore_missing: true
- gsub:
fields:
- message
pattern: "\\\."
replacement: "-"
ignore_missing: true
transform:
- fields:
- message
type: string
- field: time
type: time
index: timestamp'
Now that the Pipeline has been created successfully, you can test the Pipeline using the dryrun
interface.
Debug writing logs
We can test the Pipeline using the dryrun
interface. We will test it with erroneous log data where the value of the message field is in numeric format, causing the pipeline to fail during processing.
This API is only used to test the results of the Pipeline and does not write logs to GreptimeDB.
curl -X "POST" "http://localhost:4000/v1/events/pipelines/dryrun?pipeline_name=test" \
-H 'Content-Type: application/json' \
-d $'{"message": 1998.08,"time":"2024-05-25 20:16:37.217"}'
{"error":"Failed to execute pipeline, reason: gsub processor: expect string or array string, but got Float64(1998.08)"}
The output indicates that the pipeline processing failed because the gsub
Processor expects a string type rather than a floating-point number type. We need to adjust the format of the log data to ensure the pipeline can process it correctly.
Let's change the value of the message field to a string type and test the pipeline again.
curl -X "POST" "http://localhost:4000/v1/events/pipelines/dryrun?pipeline_name=test" \
-H 'Content-Type: application/json' \
-d $'{"message": "1998.08","time":"2024-05-25 20:16:37.217"}'
At this point, the Pipeline processing is successful, and the output is as follows:
{
"rows": [
[
{
"data_type": "STRING",
"key": "message",
"semantic_type": "FIELD",
"value": "1998-08"
},
{
"data_type": "TIMESTAMP_NANOSECOND",
"key": "time",
"semantic_type": "TIMESTAMP",
"value": "2024-05-25 20:16:37.217+0000"
}
]
],
"schema": [
{
"colume_type": "FIELD",
"data_type": "STRING",
"fulltext": false,
"name": "message"
},
{
"colume_type": "TIMESTAMP",
"data_type": "TIMESTAMP_NANOSECOND",
"fulltext": false,
"name": "time"
}
]
}
It can be seen that the .
in the string 1998.08
has been replaced with -
, indicating a successful processing of the Pipeline.