Kai Weber
07/06/2020, 1:50 PMnicholas
Kai Weber
07/06/2020, 2:33 PMnicholas
Kai Weber
07/06/2020, 4:11 PMdef Start_MQTT_MessageBroker(eclmosq_DI, eclmosq_Name, srvNet):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("TASK **************************************************************")
<http://logger.info|logger.info>("** Starting MQTT Server")
<http://logger.info|logger.info>("** ====================")
<http://logger.info|logger.info>("** Docker-Image: {}".format(eclmosq_DI))
<http://logger.info|logger.info>("** Server-Name: {}".format(eclmosq_Name))
<http://logger.info|logger.info>("** Netzwerkname: {}".format(srvNet))
<http://logger.info|logger.info>("*******************************************************************")
strPull=PullImage(repository=eclmosq_DI)
<http://logger.info|logger.info>("** PULL IMAGE: %s ", strPull.__dict__.items())
strList=ListImages()
<http://logger.info|logger.info>("** LIST IMAGE: %s ", strList.__dict__.items())
First step, I try to pull the current Image "eclipse-mosquitto"nicholas
Kai Weber
07/06/2020, 4:45 PMimport os
import prefect
from prefect import Flow, task, Parameter
from prefect.tasks.docker import (
CreateContainer,
StartContainer,
GetContainerLogs,
WaitOnContainer,
ListImages,
PullImage
)
from prefect.tasks.control_flow import ifelse
@task
def Load_ServerConfiguration(devPath):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("TASK **************************************************************")
<http://logger.info|logger.info>("Reading configuration data (Parameters)")
<http://logger.info|logger.info>("Serverumgebung: {}".format(devPath))
#<http://logger.info|logger.info>("Serverumgebung: ", devPath)
#for root, dirs, files in os.walk("."):
# for filename in files:
# <http://logger.info|logger.info>(filename)
<http://logger.info|logger.info>("*******************************************************************")
@task
def Start_MQTT_MessageBroker(eclmosq_DI, eclmosq_Name, srvNet):
logger = prefect.context.get("logger")
<http://logger.info|logger.info>("TASK **************************************************************")
<http://logger.info|logger.info>("** Starting MQTT Server")
<http://logger.info|logger.info>("** ====================")
<http://logger.info|logger.info>("** Docker-Image: {}".format(eclmosq_DI))
<http://logger.info|logger.info>("** Server-Name: {}".format(eclmosq_Name))
<http://logger.info|logger.info>("** Netzwerkname: {}".format(srvNet))
<http://logger.info|logger.info>("*******************************************************************")
strPull=PullImage(repository=eclmosq_DI)
<http://logger.info|logger.info>("** PULL IMAGE: %s ", strPull.__dict__.items())
strList=ListImages()
<http://logger.info|logger.info>("** LIST IMAGE: %s ", strList.__dict__.items())
return [1, 2, 3]
@task
def transform(x):
return [i * 10 for i in x]
@task
def load(y):
print("Received y: {}".format(y))
with Flow("BASIC_SpinOffServers") as flow:
devPath = Parameter("ServerEnv", default="~")
srvNet = Parameter("ServerNet", default="prefect-server")
eclmosq_DI = Parameter ("EclMosq_DI", default="eclipse-mosquitto")
eclmosq_Name = Parameter ("EclMosq_Name", default="MQTT_Machine")
# print("XXX:", airport)
Load_ServerConfiguration(devPath)
e = Start_MQTT_MessageBroker(srvNet, eclmosq_DI, eclmosq_Name)
#t = transform(e)
#l = load(t)
#flow.register()
#flow.run()
flow.run(ServerEnv="C:/Projekte/Lokal/software/develop/GitHub/SuperAIDev", ServerNet="prefect-server", EclMosq_DI = "eclipse-mosquitto", EclMosq_Name="MQTT_Machine")
Kai Weber
07/06/2020, 4:46 PMKai Weber
07/06/2020, 4:46 PMnicholas
PullImage
and ListImage
are Prefect tasks, you'll want to call those from within the Flow context, instead of within another task.Kai Weber
07/06/2020, 4:57 PMnicholas
from prefect import Flow, task, Parameter
from prefect.tasks.docker import (
CreateContainer,
StartContainer,
GetContainerLogs,
WaitOnContainer,
ListImages,
PullImage
)
with Flow("BASIC_SpinOffServers" as flow:
eclmosq_DI = Parameter ("EclMosq_DI", default="eclipse-mosquitto")
pullImage = PullImage(repository=eclmosq_DI)
listImages = ListImages()
...
Kai Weber
07/07/2020, 1:55 PMTask
classes, including Parameters
, are instantiated inside a with flow:
block but not added to the flow either explicitly or as the input to another task. For more information, see https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows.
"Tasks were created but not added to the flow: "
[2020-07-07 135148] INFO - prefect.TaskRunner | Task 'ServerEnv': Starting task run...Kai Weber
07/07/2020, 1:57 PMwith Flow("BASIC_SpinOffServers") as flow:
eclmosq_DI = Parameter ("EclMosq_DI", default="eclipse-mosquitto")
pullImage = PullImage(repository=eclmosq_DI)
listImages = ListImages()
Load_ServerConfiguration(devPath)
e = Start_MQTT_MessageBroker(eclmosq_DI=eclmosq_DI, eclmosq_Name=eclmosq_Name, srvNet=srvNet)
print (images.result)
print (flow.tasks)
flow.run(parameters=dict(ServerEnv="C:/Projekte/Lokal/software/develop/GitHub/SuperAIDev", ServerNet="prefect-server",
EclMosq_DI="eclipse-mosquitto", EclMosq_Name="MQTT_Machine"))