ORM that delivers support for full stack data access:
- MySQL - for relational data
- Redis - for NoSQL in memory shared cache
- Elastic Search - for full text search
- Local Cache - in memory local (not shared) cache
- ClickHouse - time series database
Menu:
- Configuration
- Defining entities
- Creating registry
- Creating engine
- Checking and updating table schema
- Adding, editing, deleting entities
- Transactions
- Loading entities using primary key
- Loading entities using search
- Reference one to one
- Cached queries
- Lazy flush
- Data loader
- Log entity changes
- Dirty stream
- Fake delete
- Working with Redis
- Working with local cache
- Working with mysql
- Working with elastic search
- Working with ClickHouse
- Working with Locker
- Query logging
- Logger
- Event broker
- Tools
First you need to define Registry object and register all connection pools to MySQL, Redis and local cache. Use this object to register queues, and entities. You should create this object once when application starts.
package main
import "github.com/summer-solutions/orm"
func main() {
registry := &Registry{}
/*MySQL */
registry.RegisterMySQLPool("root:root@tcp(localhost:3306)/database_name?limit_connections=10") // you should define max connections, default 100
//optionally you can define pool name as second argument
registry.RegisterMySQLPool("root:root@tcp(localhost:3307)/database_name", "second_pool")
registry.DefaultEncoding("utf8") //optional, default is utf8mb4
/* Redis */
registry.RegisterRedis("localhost:6379", 0)
//optionally you can define pool name as second argument
registry.RegisterRedis("localhost:6379", 1, "second_pool")
/* Redis sentinel */
registry.RegisterRedisSentinel("mymaster", 0, []string{":26379", "192.23.12.33:26379", "192.23.12.35:26379"})
// redis database number set to 2
registry.RegisterRedisSentinel("mymaster", 2, []string{":26379", "192.23.12.11:26379", "192.23.12.12:26379"}, "second_pool")
/* Local cache (in memory) */
registry.RegisterLocalCache(1000) //you need to define cache size
//optionally you can define pool name as second argument
registry.RegisterLocalCache(100, "second_pool")
/* Redis used to handle locks (explained later) */
registry.RegisterRedis("localhost:6379", 4, "lockers_pool")
registry.RegisterLocker("default", "lockers_pool")
/* ElasticSearch */
registry.RegisterElastic("http://127.0.0.1:9200")
//optionally you can define pool name as second argument
registry.RegisterElastic("http://127.0.0.1:9200", "second_pool")
// you can enable trace log
registry.RegisterElasticWithTraceLog("http://127.0.0.1:9200", "second_pool")
/* ClickHouse */
registry.RegisterClickHouse("http://127.0.0.1:9000")
//optionally you can define pool name as second argument
registry.RegisterClickHouse("http://127.0.0.1:9000", "second_pool")
}
default:
mysql: root:root@tcp(localhost:3310)/db
mysqlEncoding: utf8 //optional, default is utf8mb4
redis: localhost:6379:0
streams:
stream-1:
- test-group-1
- test-group-2
elastic: http://127.0.0.1:9200
elastic_trace: http://127.0.0.1:9201 //with trace log
clickhouse: http://127.0.0.1:9000
locker: default
local_cache: 1000
second_pool:
mysql: root:root@tcp(localhost:3311)/db2
sentinel:
master:1:
- :26379
- 192.156.23.11:26379
- 192.156.23.12:26379
package main
import (
"github.com/summer-solutions/orm"
"gopkg.in/yaml.v2"
"io/ioutil"
)
func main() {
yamlFileData, err := ioutil.ReadFile("./yaml")
if err != nil {
//...
}
var parsedYaml map[string]interface{}
err = yaml.Unmarshal(yamlFileData, &parsedYaml)
registry := InitByYaml(parsedYaml)
}
package main
import (
"github.com/summer-solutions/orm"
"time"
)
func main() {
type AddressSchema struct {
Street string
Building uint16
}
type colors struct {
Red string
Green string
Blue string
Yellow string
Purple string
}
var Colors = &colors{
orm.EnumModel,
Red: "Red",
Green: "Green",
Blue: "Blue",
Yellow: "Yellow",
Purple: "Purple",
}
type testEntitySchema struct {
orm.ORM
ID uint
Name string `orm:"length=100;index=FirstIndex"`
NameNullable string `orm:"length=100;index=FirstIndex"`
BigName string `orm:"length=max;required"`
Uint8 uint8 `orm:"unique=SecondIndex:2,ThirdIndex"`
Uint24 uint32 `orm:"mediumint=true"`
Uint32 uint32
Uint32Nullable *uint32
Uint64 uint64 `orm:"unique=SecondIndex"`
Int8 int8
Int16 int16
Int32 int32
Int64 int64
Rune rune
Int int
IntNullable *int
Bool bool
BoolNullable *bool
Float32 float32
Float64 float64
Float64Nullable *float64
Float32Decimal float32 `orm:"decimal=8,2"`
Float64DecimalSigned float64 `orm:"decimal=8,2;unsigned=false"`
Enum string `orm:"enum=orm.colorEnum"`
EnumNotNull string `orm:"enum=orm.colorEnum;required"`
Set []string `orm:"set=orm.colorEnum"`
YearNullable *uint16 `orm:"year=true"`
YearNotNull uint16 `orm:"year=true"`
Date *time.Time
DateNotNull time.Time
DateTime *time.Time `orm:"time=true"`
DateTimeNotNull time.Time `orm:"time=true"`
Address AddressSchema
Json interface{}
ReferenceOne *testEntitySchemaRef
ReferenceOneCascade *testEntitySchemaRef `orm:"cascade"`
ReferenceMany []*testEntitySchemaRef
IgnoreField []time.Time `orm:"ignore"`
Blob []byte
MediumBlob []byte `orm:"mediumblob=true"`
LongBlob []byte `orm:"longblob=true"`
FieldAsJson map[string]string
}
type testEntitySchemaRef struct {
orm.ORM
ID uint
Name string
}
type testEntitySecondPool struct {
orm.ORM `orm:"mysql=second_pool"`
ID uint
}
registry := &Registry{}
var testEntitySchema testEntitySchema
var testEntitySchemaRef testEntitySchemaRef
var testEntitySecondPool testEntitySecondPool
registry.RegisterEntity(testEntitySchema, testEntitySchemaRef, testEntitySecondPool)
registry.RegisterEnumStruct("color", Colors)
// now u can use:
Colors.GetDefault() // "Red" (first field)
Colors.GetFields() // ["Red", "Blue" ...]
Colors.GetMapping() // map[string]string{"Red": "Red", "Blue": "Blue"}
Colors.Has("Red") //true
Colors.Has("Orange") //false
//or register enum from slice
registry.RegisterEnumSlice("color", []string{"Red", "Blue"})
validatedRegistry.GetEnum("color").GetFields()
validatedRegistry.GetEnum("color").Has("Red")
//or register enum from map
registry.RegisterEnumMap("color", map[string]string{"red": "Red", "blue": "Blue"}, "red")
}
There are only two golden rules you need to remember defining entity struct:
- first field must be type of "ORM"
- second argument must have name "ID" and must be type of one of uint, uint16, uint32, uint24, uint64, rune
By default entity is not cached in local cache or redis, to change that simply use key "redisCache" or "localCache" in "orm" tag for "ORM" field:
package main
import (
"github.com/summer-solutions/orm"
"time"
)
func main() {
type testEntityLocalCache struct {
orm.ORM `orm:"localCache"` //default pool
//...
}
type testEntityLocalCacheSecondPool struct {
orm.ORM `orm:"localCache=second_pool"`
//...
}
type testEntityRedisCache struct {
orm.ORM `orm:"redisCache"` //default pool
//...
}
type testEntityRedisCacheSecondPool struct {
orm.ORM `orm:"redisCache=second_pool"`
//...
}
type testEntityLocalAndRedisCache struct {
orm.ORM `orm:"localCache;redisCache"`
//...
}
}
Once you created your registry and registered all pools and entities you should validate it. You should also run it once when your application starts.
package main
import "github.com/summer-solutions/orm"
func main() {
registry := &Registry{}
//register pools and entities
validatedRegistry, err := registry.Validate()
}
You need to crete engine to start working with entities (searching, saving). You must create engine for each http request and thread.
package main
import "github.com/summer-solutions/orm"
func main() {
registry := &Registry{}
//register pools and entities
validatedRegistry, err := registry.Validate()
engine := validatedRegistry.CreateEngine()
}
ORM provides useful object that describes entity structrure called TabelSchema:
package main
import "github.com/summer-solutions/orm"
func main() {
registry := &Registry{}
// register
validatedRegistry, err := registry.Validate()
engine := validatatedRegistry.CreateEngine()
alters := engine.GetAlters()
/*optionally you can execute alters for each model*/
var userEntity UserEntity
tableSchema := engine.GetRegistry().GetTableSchemaForEntity(userEntity)
//or
tableSchema := validatedRegistry.GetTableSchemaForEntity(userEntity)
/*checking table structure*/
tableSchema.UpdateSchema(engine) //it will create or alter table if needed
tableSchema.DropTable(engine) //it will drop table if exist
tableSchema.TruncateTable(engine)
tableSchema.UpdateSchemaAndTruncateTable(engine)
has, alters := tableSchema.GetSchemaChanges(engine)
/*getting table structure*/
db := tableSchema.GetMysql(engine)
localCache, has := tableSchema.GetLocalCache(engine)
redisCache, has := tableSchema.GetRedisCache(engine)
columns := tableSchema.GetColumns()
tableSchema.GetTableName()
}
package main
import "github.com/summer-solutions/orm"
func main() {
/* adding */
entity := testEntity{Name: "Name 1"}
engine.Flush(&entity)
entity2 := testEntity{Name: "Name 1"}
entity2.SetOnDuplicateKeyUpdate(orm.Bind{"Counter": 2}, entity2)
engine.Flush(&entity2)
entity2 = testEntity{Name: "Name 1"}
engine.SetOnDuplicateKeyUpdate(orm.Bind{}, entity2) //it will change nothing un row
engine.Flush(&entity)
/*if you need to add more than one entity*/
entity = testEntity{Name: "Name 2"}
entity2 := testEntity{Name: "Name 3"}
flusher := engine.NewFlusher()
flusher.Track(&entity, &entity2)
//it will execute only one query in MySQL adding two rows at once (atomic)
flusher.Flush()
/* editing */
flusher := engine.NewFlusher().Track(&entity, &entity2)
entity.Name = "New name 2"
//you can also use (but it's slower):
entity.SetField("Name", "New name 2")
entity.IsDirty() //returns true
entity2.IsDirty() //returns false
flusher.Flush() //it will save data in DB for all dirty tracked entities
engine.IsDirty(entity) //returns false
/* deleting */
entity2.Delete()
//or
flusher.Delete(&entity, &entity2).Flush()
/* flush will panic if there is any error. You can catch 2 special errors using this method */
err := flusher.FlushWithCheck()
//or
err := flusher.FlushInTransactionWithCheck()
orm.DuplicatedKeyError{} //when unique index is broken
orm.ForeignKeyError{} //when foreign key is broken
/* You can catch all errors using this method */
err := flusher.FlushWithFullCheck()
}
package main
import "github.com/summer-solutions/orm"
func main() {
entity = testEntity{Name: "Name 2"}
entity2 := testEntity{Name: "Name 3"}
flusher := engine.NewFlusher().Track(&entity, &entity2)
// DB transcation
flusher.FlushInTransaction()
// or redis lock
flusher.FlushWithLock("default", "lock_name", 10 * time.Second, 10 * time.Second)
// or DB transcation nad redis lock
flusher.FlushInTransactionWithLock("default", "lock_name", 10 * time.Second, 10 * time.Second)
//manual transaction
db := engine.GetMysql()
db.Begin()
defer db.Rollback()
//run queries
db.Commit()
package main
import "github.com/summer-solutions/orm"
func main() {
var entity testEntity
has := engine.LoadByID(1, &entity)
var entities []*testEntity
missing := engine.LoadByIDs([]uint64{1, 3, 4}, &entities) //missing contains IDs that are missing in database
}
package main
import "github.com/summer-solutions/orm"
func main() {
var entities []*testEntity
pager := orm.NewPager(1, 1000)
where := orm.NewWhere("`ID` > ? AND `ID` < ?", 1, 8)
engine.Search(where, pager, &entities)
//or if you need number of total rows
totalRows := engine.SearchWithCount(where, pager, &entities)
//or if you need only one row
where := onm.NewWhere("`Name` = ?", "Hello")
var entity testEntity
found := engine.SearchOne(where, &entity)
//or if you need only primary keys
ids := engine.SearchIDs(where, pager, entity)
//or if you need only primary keys and total rows
ids, totalRows = engine.SearchIDsWithCount(where, pager, entity)
}
package main
import "github.com/summer-solutions/orm"
func main() {
type UserEntity struct {
ORM
ID uint64
Name string
School *SchoolEntity `orm:"required"` // key is "on delete restrict" by default not not nullable
SecondarySchool *SchoolEntity // key is nullable
}
type SchoolEntity struct {
ORM
ID uint64
Name string
}
type UserHouse struct {
ORM
ID uint64
User *UserEntity `orm:"cascade;required"` // on delete cascade and is not nullable
}
// saving in DB:
user := UserEntity{Name: "John"}
school := SchoolEntity{Name: "Name of school"}
house := UserHouse{Name: "Name of school"}
engine.Track(&user, &school, &house)
user.School = school
house.User = user
engine.Flush()
// loading references:
_ = engine.LoadById(1, &user)
user.School != nil //returns true, School has ID: 1 but other fields are nof filled
user.School.ID == 1 //true
user.School.Loaded() //false
user.Name == "" //true
user.School.Load(engine) //it will load school from db
user.School.Loaded() //now it's true, you can access school fields like user.School.Name
user.Name == "Name of school" //true
//If you want to set reference and you have only ID:
user.School = &SchoolEntity{ID: 1}
// detaching reference
user.School = nil
// preloading references
engine.LoadByID(1, &user, "*") //all references
engine.LoadByID(1, &user, "School") //only School
engine.LoadByID(1, &user, "School", "SecondarySchool") //only School and SecondarySchool
engine.LoadByID(1, &userHouse, "User/School", "User/SecondarySchool") //User, School and SecondarySchool in each User
engine.LoadByID(1, &userHouse, "User/*") // User, all references in User
engine.LoadByID(1, &userHouse, "User/*/*") // User, all references in User and all references in User subreferences
//You can have as many levels you want: User/School/AnotherReference/EvenMore/
//You can preload referenes in all search and load methods:
engine.LoadByIDs()
engine.Search()
engine.SearchOne()
engine.CachedSearch()
...
}
package main
import "github.com/summer-solutions/orm"
func main() {
//Fields that needs to be tracked for changes should start with ":"
type UserEntity struct {
ORM
ID uint64
Name string
Age uint16
IndexAge *CachedQuery `query:":Age = ? ORDER BY :ID"`
IndexAll *CachedQuery `query:""` //cache all rows
IndexName *CachedQuery `queryOne:":Name = ?"`
}
pager := orm.NewPager(1, 1000)
var users []*UserEntity
var user UserEntity
totalRows := engine.CachedSearch(&users, "IndexAge", pager, 18)
totalRows = engine.CachedSearch(&users, "IndexAll", pager)
has := engine.CachedSearchOne(&user, "IndexName", "John")
}
Sometimes you want to flush changes in database, but it's ok if data is flushed after some time. For example when you want to save some logs in database.
package main
import "github.com/summer-solutions/orm"
func main() {
// you need to register redis
registry.RegisterRedis("localhost:6379", 0)
registry.RegisterRedis("localhost:6380", 0, "another_redis")
// .. create engine
type User struct {
ORM `orm:"log"`
ID uint
Name string
Age int `orm:"skip-log"` //Don't track this field
}
// optionally you can set optional redis pool used to queue all events
type Dog struct {
ORM `orm:"asyncRedisLazyFlush=another_redis"`
ID uint
Name string
}
// now in code you can use FlushLazy() methods instead of Flush().
// it will send changes to queue (database and cached is not updated yet)
engine.FlushLazy()
// you need to run code that will read data from queue and execute changes
// run in separate goroutine (cron script)
consumer := NewAsyncConsumer(engine, "my-consumer", 1) // you can run maximum one consumer
consumer.Digest() //It will wait for new messages in a loop, run receiver.DisableLoop() to run loop once
consumerAnotherPool := NewAsyncConsumer(engine, "my-consumer", 5) // you can run up to 5 consumers at the same time
consumerAnotherPool.Digets()
}
It's a good practice to cache entities in one short request (e.g. http request) to reduce number of requests to databases.
If you are using more than one goroutine (for example in GraphQL backend implementation) you can enable Data loader in engine to group many queries into one and reduce number of queries. You can read more about idea behind it here.
package main
import "github.com/summer-solutions/orm"
func main() {
engine.EnableDataLoader(true)
}
Otherwise set false and all entities will be cached in a simple temporary cache:
package main
import "github.com/summer-solutions/orm"
func main() {
engine.EnableDataLoader(false)
}
ORM can store in database every change of entity in special log table.
package main
import "github.com/summer-solutions/orm"
func main() {
//it's recommended to keep logs in separated DB
registry.RegisterMySQLPool("root:root@tcp(localhost:3306)/log_database", "log_db_pool")
// you need to register default Redis
registry.RegisterRedis("localhost:6379", 0)
registry.RegisterRedis("localhost:6380", 0, "another_redis")
//next you need to define in Entity that you want to log changes. Just add "log" tag or define mysql pool name
type User struct {
ORM `orm:"log"`
ID uint
Name string
Age int `orm:"skip-log"` //Don't track this field
}
// optionally you can set optional redis pool used to queue all events
type Dog struct {
ORM `orm:"log=log_db_pool;asyncRedisLogs=another_redis"`
ID uint
Name string
}
// Now every change of User will be saved in log table
// You can add extra data to log, simply use this methods before Flush():
engine.SetLogMetaData("logged_user_id", 12)
engine.SetLogMetaData("ip", request.GetUserIP())
// you can set meta only in specific entity
engine.SetEntityLogMeta("user_name", "john", entity)
consumer := NewAsyncConsumer(engine, "my-consumer", 1)
consumer.Digets() //it will wait for new messages in queue
consumerAnotherPool := NewAsyncConsumer(engine, "my-consumer", 1)
consumerAnotherPool.Digets()
}
You can send event to event broker if any specific data in entity was changed.
package main
import "github.com/summer-solutions/orm"
func main() {
//define at least one redis pool
registry.RegisterRedis("localhost:6379", 0, "event-broker-pool")
//define stream with consumer groups for events
registry.RegisterRedisStream("user_changed", "event-broker-pool", []string{"my-consumer-group"})
registry.RegisterRedisStream("age_name_changed", "event-broker-pool", []string{"my-consumer-group"})
registry.RegisterRedisStream("age_changed", "event-broker-pool", []string{"my-consumer-group"})
// create engine
// next you need to define in Entity that you want to log changes. Just add "dirty" tag
type User struct {
orm.ORM `orm:"dirty=user_changed"` //define dirty here to track all changes
ID uint
Name string `orm:"dirty=age_name_changed"` //event will be send to age_name_changed if Name or Age changed
Age int `orm:"dirty=age_name_changed,age_changed"` //event will be send to age_changed if Age changed
}
consumer := engine.GetEventBroker().Consume("my-consumer", "my-consumer-group", 1)
consumer.Consume(context.Background(), 100, func(events []orm.Event) {
for _, event := range events {
dirty := orm.EventDirtyEntity(event) // created wrapper around event to easily access data
if dirty.Added() {
fmt.Printf("Entity %s with ID %d was added", dirty.TableSchema().GetType().String(), dirty.ID())
} else if dirty.Updated() {
fmt.Printf("Entity %s with ID %d was updated", dirty.TableSchema().GetType().String(), dirty.ID())
} else if dirty.Deleted() {
fmt.Printf("Entity %s with ID %d was deleted", dirty.TableSchema().GetType().String(), dirty.ID())
}
event.Ack()
}
})
}
If you want to keep deleted entity in database but ny default this entity should be excluded from all engine.Search() and engine.CacheSearch() queries you can use FakeDelete column. Simply create field bool with name "FakeDelete".
func main() {
type UserEntity struct {
ORM
ID uint64
Name string
FakeDelete bool
}
//you can delete in two ways:
engine.Delete(user) -> will set user.FakeDelete = true
//or:
user.FakeDelete = true
engine.Flush(user) //it will save entity id in Column `FakeDelete`.
//will return all rows where `FakeDelete` = 0
total, err = engine.SearchWithCount(NewWhere("1"), nil, &rows)
//To force delete (remove row from DB):
engine.ForceDelete(user)
}
package main
import "github.com/summer-solutions/orm"
func main() {
config.RegisterRedis("localhost:6379", 0)
//storing data in cached for x seconds
val := engine.GetRedis().GetSet("key", 1, func() interface{} {
return "hello"
})
//standard redis api
keys := engine.GetRedis().LRange("key", 1, 2)
engine.GetRedis().LPush("key", "a", "b")
//...
//rete limiter
valid := engine.GetRedis().RateLimit("resource_name", redis_rate.PerMinute(10))
}
package main
import "github.com/summer-solutions/orm"
func main() {
registry.RegisterLocalCache(1000)
//storing data in cached for x seconds
val := engine.GetLocalCache().GetSet("key", 1, func() interface{} {
return "hello"
})
//getting value
value, has := engine.GetLocalCache().Get("key")
//getting many values
values := engine.GetLocalCache().MGet("key1", "key2")
//setting value
engine.GetLocalCache().Set("key", "value")
//setting values
engine.GetLocalCache().MSet("key1", "value1", "key2", "value2" /*...*/)
//getting values from hash set (like redis HMGET)
values = engine.GetLocalCache().HMget("key")
//setting values in hash set
engine.GetLocalCache().HMset("key", map[string]interface{}{"key1" : "value1", "key2": "value 2"})
//deleting value
engine.GetLocalCache().Remove("key1", "key2" /*...*/)
//clearing cache
engine.GetLocalCache().Clear()
}
package main
import (
"database/sql"
"github.com/summer-solutions/orm"
)
func main() {
// register mysql pool
registry.RegisterMySQLPool("root:root@tcp(localhost:3306)/database_name")
res := engine.GetMysql().Exec("UPDATE `table_name` SET `Name` = ? WHERE `ID` = ?", "Hello", 2)
var row string
found := engine.GetMysql().QueryRow(orm.NewWhere("SELECT * FROM `table_name` WHERE `ID` = ?", 1), &row)
results, def := engine.GetMysql().Query("SELECT * FROM `table_name` WHERE `ID` > ? LIMIT 100", 1)
defer def()
for results.Next() {
var row string
results.Scan(&row)
}
def() //if it's not last line in this method
}
package main
import (
"github.com/summer-solutions/orm"
)
func main() {
type TestIndex struct {
}
func (i *TestIndex) GetName() string {
return "test_index"
}
func (i *TestIndex) GetDefinition() map[string]interface{} {
return map[string]interface{}{
"settings": map[string]interface{}{
"number_of_replicas": "1",
"number_of_shards": "1",
},
"mappings": map[string]interface{}{
"properties": map[string]interface{}{
"Name": map[string]interface{}{
"type": "keyword",
"normalizer": "case_insensitive",
},
},
},
}
}
// register elastic search pool and index
registry.RegisterElastic("http://127.0.0.1:9200")
registry.RegisterElasticIndex(&TestIndex{})
e := engine.GetElastic()
// create indices
for _, alter := range engine.GetElasticIndexAlters() {
// alter.Safe is true if index does not exists or is not empty
engine.GetElastic(alter.Pool).CreateIndex(alter.Index)
}
query := elastic.NewBoolQuery()
query.Must(elastic.NewTermQuery("user_id", 12))
options := &orm.SearchOptions{}
options.AddSort("created_at", true).AddSort("name", false)
results := e.Search("users", query, orm.NewPager(1, 10), options)
}
package main
import (
"github.com/summer-solutions/orm"
)
func main() {
// register elastic search pool
registry.RegisterClickHouse("http://127.0.0.1:9000")
ch := engine.GetClickHouse()
ch.Exec("INSERT INTO `table` (name) VALUES (?)", "hello")
statement, def := ch.Prepare("INSERT INTO `table` (name) VALUES (?)")
defer def()
statement.Exec("hello")
statement.Exec("hello 2")
rows, def := ch.Queryx("SELECT FROM `table` WHERE x = ? AND y = ?", 1, "john")
defer def()
for rows.Next() {
m := &MyStruct{}
err := rows.StructScan(m)
}
ch.Begin()
defer ch.Rollback()
// run queries
defer ch.Commit()
}
Shared cached that is using redis
package main
import "github.com/summer-solutions/orm"
func main() {
// register redis and locker
registry.RegisterRedis("localhost:6379", 0, "my_pool")
registry.RegisterLocker("default", "my_pool")
locker, _ := engine.GetLocker()
lock := locker.Obtain("my_lock", 5 * Time.Second, 1 * Time.Second)
defer lock.Release()
// do smth
ttl := lock.TTL()
if ttl == 0 {
panic("lock lost")
}
}
You can log all queries:
- queries to MySQL database (insert, select, update)
- requests to Redis
- requests to Elastic Search
- queries to CickHouse
package main
import "github.com/summer-solutions/orm"
func main() {
//enable human friendly console log
engine.EnableQueryDebug() //MySQL, redis, Elastic Search, ClickHouse queries (local cache in excluded bt default)
engine.EnableQueryDebug(orm.QueryLoggerSourceRedis, orm.QueryLoggerSourceLocalCache)
//adding custom logger example:
engine.AddQueryLogger(json.New(os.Stdout), log.LevelWarn) //MySQL, redis warnings and above
engine.AddQueryLogger(es.New(os.Stdout), log.LevelError, orm.QueryLoggerSourceRedis)
}
package main
import "github.com/summer-solutions/orm"
func main() {
//enable json logger with proper level
engine.EnableLogger(log.InfoLevel)
//or enable human friendly console logger
engine.EnableDebug()
//you can add special fields to all logs
engine.Log().AddFields(log.Fields{"user_id": 12, "session_id": "skfjhfhhs1221"})
//printing logs
engine.Log().Warn("message", nil)
engine.Log().Debug("message", log.Fields{"user_id": 12})
engine.Log().Error(err, nil)
engine.Log().ErrorMessage("ups, that is strange", nil)
//handling recovery
if err := recover(); err != nil {
engine.Log().Error(err, nil)
}
//filling log with data from http.Request
engine.Log().AddFieldsFromHTTPRequest(request, "197.21.34.22")
}
ORM provides easy way to use event broker.
First yuo need to define streams and consumer groups:
#YAML config file
default:
redis: localhost:6381:0 // redis is required
streams:
stream-1:
- test-group-1
- test-group-2
stream-2:
- test-group-1
stream-3:
- test-group-3
or using go:
package main
import "github.com/summer-solutions/orm"
func main() {
registry := &orm.Registry{}
registry.RegisterRedisStream("stream-1", "default", []string{"test-group-1", "test-group-2"})
registry.RegisterRedisStream("stream-2", "default", []string{"test-group-1"})
registry.RegisterRedisStream("stream-3", "default", []string{"test-group-3"})
}
Publishing and receiving events :
package main
import (
"context"
"github.com/summer-solutions/orm"
)
func main() {
// .. create engine
type Person struct {
Name string
Age int
}
// fastest, no serialization
engine.GetEventBroker().PublishMap("stream-1", orm.EventAsMap{"key": "value", "anotherKey": "value 2"})
// using serialization
engine.GetEventBroker().Publish("stream-3", Person{Name: "Adam", Age: 18})
// publishing many at once, recommended because it's much faster than one by one
flusher := engine.GetEventBroker().NewFlusher()
flusher.PublishMap("stream-1", orm.EventAsMap{"key": "value", "anotherKey": "value 2"})
flusher.Publish("stream-1", Person{Name: "Adam", Age: 18})
flusher.Flush()
// reading from "stream-1" and "stream-2" streams, you can run max one consumer at once
consumerTestGroup1 := engine.GetEventBroker().Consume("my-consumer", "test-group-1", 1)
// reading max 100 events in one loop, this line stop execution, waiting for new events
consumerTestGroup1.Consume(context.Background(), 100, func(events []orm.Event) {
for _, event := range events {
values := event.RawData() // map[string]interface{}{"key": "value", "anotherKey": "value 2"}
//do some work
event.Ack() // this line is acknowledging event
}
})
// auto acknowledging
consumerTestGroup1.Consume(context.Background(), 100, func(events []orm.Event) {
//do some work, for example put all events at once to DB
// in this example all events will be acknowledge when this method is finished
})
// skipping some events
consumerTestGroup1.Consume(context.Background(), 100, func(events []orm.Event) {
for _, event := range events {
if someCondition {
event.Ack()
} else {
event.Skip() //this event will be consumed again later
}
}
})
// reading from "stream-3" stream, you can run max to two consumers at once
consumerTestGroup3 := engine.GetEventBroker().Consume("my-consumer", "test-group-2", 2)
consumerTestGroup3.DisableLoop() // all events will be consumed once withour waiting for new events
consumerTestGroup3.Consume(context.Background(), 100, func(events []orm.Event) {
var person Person
for _, event := range events {
err := event.Unserialize(&person)
if err != nil {
// ...
}
//do some work
event.Ack() // this line is acknowledging event
}
})
}
Setting another redis pool for AsyncConsumer:
another_pool:
redis: localhost:6381:0
streams:
orm-lazy-channel: # FlushLazy()
- default-group # group name for AsyncConsumer
orm-log-channel: # adding changes to logs
- default-group # group name for AsyncConsumer
- test-group-2 # you can register another consumers to read logs
Redis streams statistics
package main
import "github.com/summer-solutions/orm/tools"
func main() {
stats := tools.GetRedisStreamsStatistics(engine)
}