# kratos-cronjob **Repository Path**: ploynomail/kratos-cronjob ## Basic Information - **Project Name**: kratos-cronjob - **Description**: kratos 框架的job server - **Primary Language**: Unknown - **License**: Not specified - **Default Branch**: master - **Homepage**: None - **GVP Project**: No ## Statistics - **Stars**: 0 - **Forks**: 0 - **Created**: 2023-02-01 - **Last Updated**: 2025-02-20 ## Categories & Tags **Categories**: Uncategorized **Tags**: None ## README # NOTE: 这个go mod是为go-kratos框架Cron Job简单编排 # Install ```go go get gitee.com/ploynomail/kratos-cronjob ``` # Use this module in kratos ## Install Kratos [参考:kratos doc](https://go-kratos.dev/docs/getting-started/usage) Note: 下面的目录都相对于kratos的创建目录 ## Create cron job server ### kratos path ```shell helloworld/internal/server/ ``` ### create file job.go ```go package server import ( "helloworld/internal/tasks" kratoscronjob "gitee.com/ploynomail/kratos-cronjob" "github.com/go-kratos/kratos/v2/log" "github.com/robfig/cron/v3" ) func NewJobServer(logger log.Logger, job *tasks.ExampleJob) *kratoscronjob.Server { srv := kratoscronjob.NewServer( logger, []kratoscronjob.ServerOption{ kratoscronjob.WithAPiTrigger(), }, []cron.Option{ cron.WithSeconds(), }, ) srv.RegisterJob(job) return srv } ``` ### register job server ```go // cat helloworld/internal/server/server.go package server import ( "github.com/google/wire" ) // ProviderSet is server providers. var ProviderSet = wire.NewSet(NewGRPCServer, NewHTTPServer, NewJobServer) ``` ## Create Kratos tasks ```shell mkdir helloworld/internal/tasks ``` ### register tasks ProviderSet ```go // cat helloworld/internal/tasks/tasks.go package tasks import "github.com/google/wire" // ProviderSet is service providers. var ProviderSet = wire.NewSet(NewExampleJob) ``` ### newApp change ```go // cat helloworld/cmd/helloworld/main.go#37 func newApp(logger log.Logger, gs *grpc.Server, hs *http.Server, js *kratoscronjob.Server) *kratos.App { return kratos.New( kratos.ID(id), kratos.Name(Name), kratos.Version(Version), kratos.Metadata(map[string]string{}), kratos.Logger(logger), kratos.Server( gs, hs, js, ), ) } ``` ### wire.go ```shell cat helloworld/cmd/helloworld/wire.go ``` ```go //go:build wireinject // +build wireinject // The build tag makes sure the stub is not built in the final build. package main import ( "helloworld/internal/biz" "helloworld/internal/conf" "helloworld/internal/data" "helloworld/internal/server" "helloworld/internal/service" "helloworld/internal/tasks" "github.com/go-kratos/kratos/v2" "github.com/go-kratos/kratos/v2/log" "github.com/google/wire" ) // wireApp init kratos application. func wireApp(*conf.Server, *conf.Data, log.Logger) (*kratos.App, func(), error) { panic(wire.Build(server.ProviderSet, data.ProviderSet, biz.ProviderSet, service.ProviderSet, tasks.ProviderSet, newApp)) } ``` ## Create a example job ```go // cat helloworld/internal/tasks/example.go package tasks import ( "context" "fmt" kratoscronjob "gitee.com/ploynomail/kratos-cronjob" ) type ExampleJob struct { } func NewExampleJob() *ExampleJob { job := &ExampleJob{} return job } // The following interfaces are necessary excuses for registration // Name return job name func (e *ExampleJob) Name() string { return "examplejob" } // Spec return job schedule string, like * * * * * func (e *ExampleJob) Spec() string { return "* * * * * *" } // Action is job start func func (e *ExampleJob) Action() func() { return func() { var completedAction map[string]string = make(map[string]string) tasks := CompareWorkFlow() tasks.StartWithContext(context.Background(), completedAction) tasks.WaitDone() fmt.Println(completedAction) fmt.Printf("Run job name:%s spec:%s tags:%s owner:%s\n", e.Name(), e.Spec(), e.Tags(), e.Owner()) } } func (e *ExampleJob) Tags() []string { return []string{"example", "first job"} } func (e *ExampleJob) Owner() string { return "hongmin" } func (e *ExampleJob) Info() map[string]interface{} { var info map[string]interface{} = make(map[string]interface{}) info["info1"] = "info message" return info } // The following interfaces are tasks // start task item // A -> B,C -> D -> E type DagTaskA struct { name string action string } func (t *DagTaskA) Run(i interface{}) error { x := i.(map[string]string) x[t.name] = t.action fmt.Printf("task: %s action: %s\n", t.name, t.action) return nil } type DagTaskB struct { name string action string } func (t *DagTaskB) Run(i interface{}) error { x := i.(map[string]string) x[t.name] = t.action fmt.Printf("task: %s action: %s\n", t.name, t.action) return nil } type DagTaskC struct { name string action string } func (t *DagTaskC) Run(i interface{}) error { x := i.(map[string]string) x[t.name] = t.action fmt.Printf("task: %s action: %s\n", t.name, t.action) return nil } type DagTaskD struct { name string action string } func (t *DagTaskD) Run(i interface{}) error { x := i.(map[string]string) x[t.name] = t.action fmt.Printf("task: %s action: %s\n", t.name, t.action) return nil } type DagTaskE struct { name string action string } func (t *DagTaskE) Run(i interface{}) error { x := i.(map[string]string) x[t.name] = t.action fmt.Printf("task: %s action: %s\n", t.name, t.action) return nil } // compare Dag func CompareWorkFlow() *kratoscronjob.WorkFlow { wrf := kratoscronjob.NewWorkFlow() ANode := kratoscronjob.NewNode(&DagTaskA{ name: "taskA", action: "pull code", }) BNode := kratoscronjob.NewNode(&DagTaskB{ name: "taskB", action: "install front", }) CNode := kratoscronjob.NewNode(&DagTaskC{ name: "taskC", action: "install backend", }) DNode := kratoscronjob.NewNode(&DagTaskD{ name: "taskD", action: "build docker image", }) ENode := kratoscronjob.NewNode(&DagTaskE{ name: "taskE", action: "pull docker image", }) wrf.AddStartNode(ANode) wrf.AddEdge(ANode, BNode) wrf.AddEdge(ANode, CNode) wrf.AddEdge(BNode, DNode) wrf.AddEdge(CNode, DNode) wrf.AddEdge(DNode, ENode) wrf.ConnectToEnd(ENode) return wrf } ``` ## Run ```shell [root@coding-mac helloworld]# make init [root@coding-mac helloworld]# make generate [root@coding-mac helloworld]# make build [root@coding-mac helloworld]# ./bin/helloworld -conf configs/config.yaml 2023/02/01 15:53:39 maxprocs: Leaving GOMAXPROCS=2: CPU quota undefined DEBUG msg=config loaded: config.yaml format: yaml INFO ts=2023-02-01T15:53:39+08:00 caller=kratos-cronjob@v0.0.2/server.go:48 service.id=coding-mac service.name= service.version= trace.id= span.id= msg=Reginster task: examplejob, task id: 1, INFO ts=2023-02-01T15:53:39+08:00 caller=kratos-cronjob@v0.0.2/server.go:55 service.id=coding-mac service.name= service.version= trace.id= span.id= msg=CronJob server open api trigger. INFO ts=2023-02-01T15:53:39+08:00 caller=kratos-cronjob@v0.0.2/server.go:57 service.id=coding-mac service.name= service.version= trace.id= span.id= msg=Cron job server run: 1 job count INFO ts=2023-02-01T15:53:39+08:00 caller=http/server.go:303 service.id=coding-mac service.name= service.version= trace.id= span.id= msg=[HTTP] server listening on: [::]:8000 INFO ts=2023-02-01T15:53:39+08:00 caller=grpc/server.go:207 service.id=coding-mac service.name= service.version= trace.id= span.id= msg=[gRPC] server listening on: [::]:9000 task: taskA action: pull code task: taskB action: install front task: taskC action: install backend task: taskD action: build docker image task: taskE action: pull docker image map[taskA:pull code taskB:install front taskC:install backend taskD:build docker image taskE:pull docker image] Run job name:examplejob spec:* * * * * * tags:[example first job] owner:hongmin task: taskA action: pull code task: taskB action: install front task: taskC action: install backend task: taskD action: build docker image task: taskE action: pull docker image map[taskA:pull code taskB:install front taskC:install backend taskD:build docker image taskE:pull docker image] Run job name:examplejob spec:* * * * * * tags:[example first job] owner:hongmin task: taskA action: pull code task: taskB action: install front task: taskC action: install backend task: taskD action: build docker image task: taskE action: pull docker image map[taskA:pull code taskB:install front taskC:install backend taskD:build docker image taskE:pull docker image] Run job name:examplejob spec:* * * * * * tags:[example first job] owner:hongmin task: taskA action: pull code task: taskB action: install front task: taskC action: install backend task: taskD action: build docker image task: taskE action: pull docker image map[taskA:pull code taskB:install front taskC:install backend taskD:build docker image taskE:pull docker image] Run job name:examplejob spec:* * * * * * tags:[example first job] owner:hongmin ^CINFO ts=2023-02-01T15:53:43+08:00 caller=http/server.go:318 service.id=coding-mac service.name= service.version= trace.id= span.id= msg=[HTTP] server stopping INFO ts=2023-02-01T15:53:43+08:00 caller=kratos-cronjob@v0.0.2/server.go:63 service.id=coding-mac service.name= service.version= trace.id= span.id= msg=[Cronjob] server stop. INFO ts=2023-02-01T15:53:43+08:00 caller=grpc/server.go:219 service.id=coding-mac service.name= service.version= trace.id= span.id= msg=[gRPC] server stopping INFO ts=2023-02-01T15:53:43+08:00 caller=data/data.go:21 service.id=coding-mac service.name= service.version= trace.id= span.id= msg=closing the data resources INFO ts=2023-02-01T15:53:43+08:00 caller=config/config.go:67 service.id=coding-mac service.name= service.version= trace.id= span.id= msg=watcher's ctx cancel : context canceled ```