https://prefect.io logo
Title
k

Kai Weber

07/06/2020, 1:50 PM
Hi, can anybody help me with the docker-tasks? I want to design a flow to renew an docker image: 1. pull the latest MQTT-image (docker pull eclipse-mosquitto), 2. remove the running MQTT-container (docker rm MQTT-NAME) 3. start the new MQTT-container (docker run -tid -p 1883:1883 -p 9001:9001 --name=msg-broker --restart=always --network %x_Network% eclipse-mosquitto) How do I phrase that in Prefect? I can't even pull the latest image?! Thanks a lot!
n

nicholas

07/06/2020, 2:11 PM
Hi @Kai Weber - what are you trying currently? There are a couple of ways to handle this, either by using docker tasks or a series of shell tasks with the commands you mentioned.
k

Kai Weber

07/06/2020, 2:33 PM
Hi @nicholas I try to migrate my Docker Windows Batch files into Prefect. I want to (re)start all of my docker containers by using a flow. But I want to register (and run) the flow into my Prefect server. I don't know if this has an effect for using shell tasks? For my taste a series of shell tasks is not that elegant?!
n

nicholas

07/06/2020, 3:19 PM
@Kai Weber can you provide some minimum code for what you're trying? It seems to me you could retrieve a list of containers using the ListContainers task, map over that list and stop those containers using the StopContainer task, and then start them again based on another list you provide using the StartContainer task.
k

Kai Weber

07/06/2020, 4:11 PM
This is how far I am right now on my long way:
def Start_MQTT_MessageBroker(eclmosq_DI, eclmosq_Name, srvNet):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("TASK **************************************************************")
    <http://logger.info|logger.info>("** Starting MQTT Server")
    <http://logger.info|logger.info>("** ====================")
    <http://logger.info|logger.info>("** Docker-Image:   {}".format(eclmosq_DI))
    <http://logger.info|logger.info>("** Server-Name:    {}".format(eclmosq_Name))
    <http://logger.info|logger.info>("** Netzwerkname:   {}".format(srvNet))
    <http://logger.info|logger.info>("*******************************************************************")
    strPull=PullImage(repository=eclmosq_DI)
    <http://logger.info|logger.info>("** PULL IMAGE: %s ", strPull.__dict__.items())
    strList=ListImages()
    <http://logger.info|logger.info>("** LIST IMAGE: %s ", strList.__dict__.items())
First step, I try to pull the current Image "eclipse-mosquitto"
n

nicholas

07/06/2020, 4:41 PM
@Kai Weber what behavior are you seeing? This isn't much to go on but it looks fine from what I can see.
k

Kai Weber

07/06/2020, 4:45 PM
This is my code:
import os
import prefect
from prefect import Flow, task, Parameter
from prefect.tasks.docker import (
    CreateContainer,
    StartContainer,
    GetContainerLogs,
    WaitOnContainer,
    ListImages,
    PullImage
)
from prefect.tasks.control_flow import ifelse

@task
def Load_ServerConfiguration(devPath):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("TASK **************************************************************")
    <http://logger.info|logger.info>("Reading configuration data (Parameters)")
    <http://logger.info|logger.info>("Serverumgebung: {}".format(devPath))

    #<http://logger.info|logger.info>("Serverumgebung: ", devPath)
    #for root, dirs, files in os.walk("."):
    #    for filename in files:
    #        <http://logger.info|logger.info>(filename)
    <http://logger.info|logger.info>("*******************************************************************")

@task
def Start_MQTT_MessageBroker(eclmosq_DI, eclmosq_Name, srvNet):
    logger = prefect.context.get("logger")
    <http://logger.info|logger.info>("TASK **************************************************************")
    <http://logger.info|logger.info>("** Starting MQTT Server")
    <http://logger.info|logger.info>("** ====================")
    <http://logger.info|logger.info>("** Docker-Image:   {}".format(eclmosq_DI))
    <http://logger.info|logger.info>("** Server-Name:    {}".format(eclmosq_Name))
    <http://logger.info|logger.info>("** Netzwerkname:   {}".format(srvNet))
    <http://logger.info|logger.info>("*******************************************************************")
    strPull=PullImage(repository=eclmosq_DI)
    <http://logger.info|logger.info>("** PULL IMAGE: %s ", strPull.__dict__.items())
    strList=ListImages()
    <http://logger.info|logger.info>("** LIST IMAGE: %s ", strList.__dict__.items())
    return [1, 2, 3]

@task
def transform(x):
    return [i * 10 for i in x]


@task
def load(y):
    print("Received y: {}".format(y))

with Flow("BASIC_SpinOffServers") as flow:
    devPath         = Parameter("ServerEnv", default="~")
    srvNet          = Parameter("ServerNet", default="prefect-server")
    eclmosq_DI      = Parameter ("EclMosq_DI", default="eclipse-mosquitto")
    eclmosq_Name    = Parameter ("EclMosq_Name", default="MQTT_Machine")
#    print("XXX:", airport)
    Load_ServerConfiguration(devPath)
    e = Start_MQTT_MessageBroker(srvNet, eclmosq_DI, eclmosq_Name)
    #t = transform(e)
    #l = load(t)

#flow.register()
#flow.run()
flow.run(ServerEnv="C:/Projekte/Lokal/software/develop/GitHub/SuperAIDev", ServerNet="prefect-server", EclMosq_DI = "eclipse-mosquitto", EclMosq_Name="MQTT_Machine")
and this is the output: C:\Lokal-C\software\develop\Anaconda3\python.exe C:/Projekte/Lokal/software/develop/GitHub/SuperAIDev/.prefect/flows/BASIC_SpinOffServers.py [2020-07-06 16:44:30] INFO - prefect.FlowRunner | Beginning Flow run for 'BASIC_SpinOffServers' [2020-07-06 16:44:30] INFO - prefect.FlowRunner | Starting flow run. [2020-07-06 16:44:30] INFO - prefect.TaskRunner | Task 'ServerNet': Starting task run... [2020-07-06 16:44:30] INFO - prefect.TaskRunner | Task 'ServerNet': finished task run for task with final state: 'Success' [2020-07-06 16:44:30] INFO - prefect.TaskRunner | Task 'EclMosq_DI': Starting task run... [2020-07-06 16:44:30] INFO - prefect.TaskRunner | Task 'EclMosq_DI': finished task run for task with final state: 'Success' [2020-07-06 16:44:30] INFO - prefect.TaskRunner | Task 'ServerEnv': Starting task run... [2020-07-06 16:44:30] INFO - prefect.TaskRunner | Task 'ServerEnv': finished task run for task with final state: 'Success' [2020-07-06 16:44:31] INFO - prefect.TaskRunner | Task 'EclMosq_Name': Starting task run... [2020-07-06 16:44:31] INFO - prefect.TaskRunner | Task 'EclMosq_Name': finished task run for task with final state: 'Success' [2020-07-06 16:44:31] INFO - prefect.TaskRunner | Task 'Load_ServerConfiguration': Starting task run... [2020-07-06 16:44:31] INFO - prefect.Load_ServerConfiguration | TASK ************************************************************** [2020-07-06 16:44:31] INFO - prefect.Load_ServerConfiguration | Reading configuration data (Parameters) [2020-07-06 16:44:31] INFO - prefect.Load_ServerConfiguration | Serverumgebung: C:/Projekte/Lokal/software/develop/GitHub/SuperAIDev [2020-07-06 16:44:31] INFO - prefect.Load_ServerConfiguration | ******************************************************************* [2020-07-06 16:44:31] INFO - prefect.TaskRunner | Task 'Load_ServerConfiguration': finished task run for task with final state: 'Success' [2020-07-06 16:44:31] INFO - prefect.TaskRunner | Task 'Start_MQTT_MessageBroker': Starting task run... [2020-07-06 16:44:31] INFO - prefect.Start_MQTT_MessageBroker | TASK ************************************************************** [2020-07-06 16:44:31] INFO - prefect.Start_MQTT_MessageBroker | ** Starting MQTT Server [2020-07-06 16:44:31] INFO - prefect.Start_MQTT_MessageBroker | ** ==================== [2020-07-06 16:44:31] INFO - prefect.Start_MQTT_MessageBroker | ** Docker-Image: prefect-server [2020-07-06 16:44:31] INFO - prefect.Start_MQTT_MessageBroker | ** Server-Name: eclipse-mosquitto [2020-07-06 16:44:31] INFO - prefect.Start_MQTT_MessageBroker | ** Netzwerkname: MQTT_Machine [2020-07-06 16:44:31] INFO - prefect.Start_MQTT_MessageBroker | ******************************************************************* [2020-07-06 16:44:31] INFO - prefect.Start_MQTT_MessageBroker | ** PULL IMAGE: dict_items([('repository', 'prefect-server'), ('tag', None), ('docker_server_url', 'unix:///var/run/docker.sock'), ('name', 'PullImage'), ('slug', 'b9d634bb-d7df-4727-b64c-d4f217ac58e2'), ('logger', <Logger prefect.PullImage (INFO)>), ('tags', set()), ('max_retries', 0), ('retry_delay', None), ('timeout', None), ('trigger', <function all_successful at 0x0000026085125438>), ('skip_on_upstream_skip', True), ('cache_for', None), ('cache_key', None), ('cache_validator', <function never_use at 0x0000026083D93EE8>), ('checkpoint', None), ('result', None), ('target', None), ('state_handlers', []), ('auto_generated', False), ('log_stdout', False)]) [2020-07-06 16:44:31] INFO - prefect.Start_MQTT_MessageBroker | ** LIST IMAGE: dict_items([('repository_name', None), ('all_layers', False), ('filters', None), ('docker_server_url', 'unix:///var/run/docker.sock'), ('name', 'ListImages'), ('slug', '61ddd961-74a1-46dd-928a-13c10f31c35f'), ('logger', <Logger prefect.ListImages (INFO)>), ('tags', set()), ('max_retries', 0), ('retry_delay', None), ('timeout', None), ('trigger', <function all_successful at 0x0000026085125438>), ('skip_on_upstream_skip', True), ('cache_for', None), ('cache_key', None), ('cache_validator', <function never_use at 0x0000026083D93EE8>), ('checkpoint', None), ('result', None), ('target', None), ('state_handlers', []), ('auto_generated', False), ('log_stdout', False)]) [2020-07-06 16:44:31] INFO - prefect.TaskRunner | Task 'Start_MQTT_MessageBroker': finished task run for task with final state: 'Success' [2020-07-06 16:44:31] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded Process finished with exit code 0
I hoped to find the current image of my Mosquitto image?!
n

nicholas

07/06/2020, 4:52 PM
Ah @Kai Weber - since
PullImage
and
ListImage
are Prefect tasks, you'll want to call those from within the Flow context, instead of within another task.
k

Kai Weber

07/06/2020, 4:57 PM
@nicholas - Is it a stupid question to ask what it means to call the tasks from within the Flow context. How would that look like? - Sorry I am just starting...
n

nicholas

07/06/2020, 5:06 PM
No worries at all! You'll want to call tasks like this instead:
from prefect import Flow, task, Parameter
from prefect.tasks.docker import (
    CreateContainer,
    StartContainer,
    GetContainerLogs,
    WaitOnContainer,
    ListImages,
    PullImage
)

with Flow("BASIC_SpinOffServers" as flow:
  eclmosq_DI = Parameter ("EclMosq_DI", default="eclipse-mosquitto")

  pullImage = PullImage(repository=eclmosq_DI)

  listImages = ListImages()

  ...
k

Kai Weber

07/07/2020, 1:55 PM
@nicholas - it is still not working. Now I get the following error: C:\Lokal-C\software\develop\Anaconda3\python.exe C:/Projekte/Lokal/software/develop/GitHub/SuperAIDev/.prefect/flows/BASIC_SpinOffServers.py None {<Task: Load_ServerConfiguration>, <Parameter: ServerEnv>, <Parameter: EclMosq_Name>, <Parameter: EclMosq_DI>, <Task: Start_MQTT_MessageBroker>, <Parameter: ServerNet>} [2020-07-07 13:51:48] INFO - prefect.FlowRunner | Beginning Flow run for 'BASIC_SpinOffServers' [2020-07-07 13:51:48] INFO - prefect.FlowRunner | Starting flow run. C:\Lokal-C\software\develop\Anaconda3\Lib\site-packages\prefect\core\flow.py:342: UserWarning: Tasks were created but not added to the flow: {<Task: ListImages>, <Task: PullImage>}. This can occur when
Task
classes, including
Parameters
, are instantiated inside a
with flow:
block but not added to the flow either explicitly or as the input to another task. For more information, see https://docs.prefect.io/core/advanced_tutorials/task-guide.html#adding-tasks-to-flows. "Tasks were created but not added to the flow: " [2020-07-07 13:51:48] INFO - prefect.TaskRunner | Task 'ServerEnv': Starting task run...
Sorry this was my flow:
with Flow("BASIC_SpinOffServers") as flow:
    eclmosq_DI = Parameter ("EclMosq_DI", default="eclipse-mosquitto")
    pullImage = PullImage(repository=eclmosq_DI)
    listImages = ListImages()


    Load_ServerConfiguration(devPath)
    e = Start_MQTT_MessageBroker(eclmosq_DI=eclmosq_DI, eclmosq_Name=eclmosq_Name, srvNet=srvNet)

print (images.result)
print (flow.tasks)

flow.run(parameters=dict(ServerEnv="C:/Projekte/Lokal/software/develop/GitHub/SuperAIDev", ServerNet="prefect-server",
         EclMosq_DI="eclipse-mosquitto", EclMosq_Name="MQTT_Machine"))