logstash 文件下的内容 是提前从容器复制出来的一份,修改了 logstash.yml 的连接 es 内容
logstash 连接器 用于连接 msyql https://mvnrepository.com/artifact/mysql/mysql-connector-java
如果自己写的脚本运行报错 会产生一个.lock 的文件 记得删除【config】文件下
elasticsearch.username: "kibana"
elasticsearch.password: "123456"
docker-compose up -d
docker exec -it elasticsearch sh
bin/elasticsearch-setup-passwords interactive
Enter password for [elastic]:
Reenter password for [elastic]:
Enter password for [apm_system]:
Reenter password for [apm_system]:
Enter password for [kibana_system]:
Reenter password for [kibana_system]:
Enter password for [logstash_system]:
Reenter password for [logstash_system]:
Enter password for [beats_system]:
Reenter password for [beats_system]:
Enter password for [remote_monitoring_user]:
Reenter password for [remote_monitoring_user]:
Changed password for user [apm_system]
Changed password for user [kibana_system]
Changed password for user [kibana]
Changed password for user [logstash_system]
Changed password for user [beats_system]
Changed password for user [remote_monitoring_user]
Changed password for user [elastic]
127.0.0.1:5601
账号 elastic
密码 123456
启动后默认连接数据库 test 表 读取了 att_faq 表数据,id,content 两个字段存储在 es 的 test 索引中(不需要的话 修改 pipelines.yml 配置)
ls -l /usr/share/logstash/config/mysql-connector-java-5.1.49.jar
chmod +r /usr/share/logstash/config/mysql-connector-java-5.1.49.jar
使用了分词
<?php
namespace App\Http\Controllers;
use Elastic\Elasticsearch\ClientBuilder;
use Illuminate\Support\Facades\DB;
class ElasticSearch extends Controller
{
public $client = null;
public function __construct()
{
$this->client = ClientBuilder::create()
->setHosts(["http://elasticsearch:9200"])
->setBasicAuthentication('elastic', "123456")
->build();
}
public function infos()
{
$response = $this->client->info();
echo "<pre>";
print_R($response);
}
// 创建分词索引
public function esCreateIk()
{
$params = [
'index' => 'ik',
'body' => [
'mappings' => [
'properties' => [
'content' => [
'type' => 'text',
'analyzer' => 'ik_max_word',
],
],
],
],
];
$ik = $this->client->indices()->create($params);
dd($ik->asArray());
}
//判断索引是否存在
public function isIndex()
{
$index = $this->client->indices()->exists(
['index' => 'ik']
)->asBool();
dd($index);
}
//查看索引的的信息
public function indexInfo()
{
$index = $this->client->indices()->getMapping(
['index' => 'ik']
);
dd($index->asArray());
}
//删除索引及数据
public function indexDelete()
{
$index = $this->client->indices()->delete(
['index' => 'ik']
);
dd($index->asArray());
}
//删除索引下面id=1的数据
public function esDelete()
{
$params = [
'index' => 'ik',
'id' => 1,
];
$response = $this->client->delete($params);
dd($response->asArray());
}
// 数据插入
public function esCreateIkData()
{
$array = [
'index' => 'ik',
'type' => 'doc',
'id' => 1,
'body' => [
'content' => '测试数据',
],
];
$result = $this->client->index($array);
dd($result);
}
// 批量插入数据
public function eaCreateIkDataBulk()
{
set_time_limit(0);
$data = DB::table('faq')->get();
// 一条一条插入
foreach ($data as $key => $value) {
$array['body'][] = ['index' => ['_index' => 'ik', '_id' => $value->id]];
$array['body'][] = ['content' => $value->content];
}
$result = $this->client->bulk($array);
dd($result);
}
// 查询当前索引下有多少数据
public function esCountData()
{
$params = [
'index' => 'ik',
];
echo $this->client->count($params);
}
/**
* 查询 ik 下面所有数据
* 默认返回最多10数据
*/
public function esIkSearch()
{
$query = [
'index' => 'ik',
// 'id' => 1, // 查询id 的话就加这个字段
];
$result = $this->client->search($query);
dd($result->asArray());
}
/**
* 查询 ik 下面数据 加各种条件
*
*/
public function esIkSearchWhere()
{
$query = [
'index' => 'ik',
'body' => [
'query' => [
'match' => [
'content' => '被骗了几千块钱,有微信怎么要回来'
]
]
],
'_source' => ['content'], //目前只有content 如果字段多了 想要那个返回哪个。可以不设置。默认返回所有数据
'size' => 5, //设置一次返回5条数据、可以不设置
'from' => 2, //从第几条开始 类似于limit 5,2 可以不设置
];
$result = $this->client->search($query);
dd($result->asArray());
}
/**
* 修改数据
* 把 ik 下面 id=1 的content 修改
*/
public function esIkedit()
{
$query = [
'index' => 'ik',
'id' => 1,
'body' => [
'doc' => [
'content' => '修改数据'
],
],
];
$result = $this->client->update($query);
dd($result);
}
}
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"strings"
elasticsearch "github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-elasticsearch/v8/esapi"
)
func main() {
cfg := elasticsearch.Config{
Addresses: []string{
"http://127.0.0.1:9200",
},
Username: "elastic",
Password: "123456",
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
fmt.Println("connect to es success")
// 新增
add := add(es)
fmt.Println("新增:", add)
// 查询
query := query(es)
fmt.Println("query:", query.String())
//删除
deltete := delete(es)
fmt.Println("query:", deltete)
}
// 新增
func add(es *elasticsearch.Client) *esapi.Response {
add, err := es.Index(
"test",
strings.NewReader(`{"title" : "hello word"}`),
es.Index.WithRefresh("true"),
es.Index.WithPretty(),
es.Index.WithFilterPath("result", "_id"),
)
if err != nil {
panic(err)
}
return add
}
// 查询
func query(es *elasticsearch.Client) *esapi.Response {
var buf bytes.Buffer
where := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
"title": "hello word",
},
},
}
if err := json.NewEncoder(&buf).Encode(where); err != nil {
log.Fatalf("Error encoding query: %s", err)
}
query, err := es.Search(
es.Search.WithContext(context.Background()),
es.Search.WithIndex("test"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
)
if err != nil {
panic(err)
}
return query
}
// 删除
func delete(es *elasticsearch.Client) *esapi.Response {
// 删除test索引下id=dae5NoYBvw08KezVlFPX
delete, err := es.Delete("test", "dae5NoYBvw08KezVlFPX")
if err != nil {
panic(err)
}
return delete
}