Sean Talia
02/18/2021, 9:22 PMBatchSubmit
task before? I'm just starting to play around with it and am curious as to exactly what actions i'm going to need to include in my iam policy allow list for the host on which the task will execute (besides obviously the batch:SubmitJob
action)matta
02/18/2021, 10:43 PMAjith Kumara Beragala Acharige Lal
02/19/2021, 1:31 AMprefect auth create-token -n my-runner-token -s RUNNER
i get this errormatta
02/19/2021, 4:31 AMDaskKubernetesEnvironment
for flows in there, and it launches even when I don't specify an image (which is awesome!). Just out of curiosity, what exactly am I launching? Is it the same image as the container of the JupyterHub instance?Harshal Rane
02/19/2021, 6:21 AMMilly gupta
02/19/2021, 3:06 PMVarun Joshi
02/19/2021, 3:27 PMAjith Kumara Beragala Acharige Lal
02/19/2021, 5:29 PMservice/prefect-server-ui
) with a prefect-web-host-name ( eg: prefect.mycompany.com ) when installing prefect server via helm ? ) , so that internal users can access prefect-server via a hostname instead of a IP? https://github.com/PrefectHQ/server/blob/master/helm/prefect-server/values.yamlTony
02/19/2021, 6:59 PMflow not found
when using Docker storage, but having mixed results.
I’m working on a CICD script for any arbitrary repo to loop through a directory of flow files, build container storage (can be many containers or a single one, i’m fine with either), and then register them. It starts by copying and building my repo into a container, then hopefully builds that container with all the prefect goodies, and sets the run_config
:
image_name = f"{git_repo}/prefect-{Path(flow_file).stem.lower()}"
image_tag = f"githash-{git_commit}"
# built by cicd agent
base_image = f"{REGISTRY_URL}/{git_repo}/flows:{image_tag}"
# second image built
base_image_plus_prefect = f"{REGISTRY_URL}/{image_name}:{image_tag}"
flow.storage = Docker(
registry_url=REGISTRY_URL,
base_image=base_image,
image_name=image_name,
image_tag=image_tag,
env_vars=fetch_env_vars(os.environ["GIT_BRANCH"]),
# here be the trouble
# same path as my Dockerfile???
path=f"/repo/{flow_file}",
stored_as_script=True,
)
flow.storage.build()
flow.run_config = DockerRun(
image=base_image_plus_prefect,
labels=fetch_agent_labels(os.environ["GIT_BRANCH"]),
)
... cicd stuff ...
flow.register()
I’m getting: ValueError('Flow is not contained in this Storage')
when running this currently.
First option I see is specifying Docker(…, files={…})
yet some repos might have dozens or hundreds of extra files that they need to include, any chance files
takes in wildcard pathing?
Second option I see is [multi-flow storage], but then I run into CICD problems:
File "/opt/prefect/healthcheck.py", line 151, in <module>
flows = cloudpickle_deserialization_check(flow_file_paths)
File "/opt/prefect/healthcheck.py", line 44, in cloudpickle_deserialization_check
flows.append(cloudpickle.loads(flow_bytes))
ModuleNotFoundError: No module named 'flows.<my flow name>'
Anyone have some better ideas?Omar Sultan
02/20/2021, 12:49 PMOmar Sultan
02/20/2021, 12:49 PMJack Sundberg
02/20/2021, 6:09 PMJack Sundberg
02/21/2021, 1:38 AMclient.create_flow_run(flow_id)
But this requires me to store or know the flow_id, which I'd rather not do between separate scripts. Is there a reason this type of submission isn't supported?:
client.create_flow_run(project_name, flow_name)
The CLI supports this so I expected the python API for client to as well. If it's alright, I'd like to open a feature-request on github. I think the fix will be easy. Something like:
def create_flow_run(flow_id=None, project_name=None, flow_name=None):
if not flow_id and not (project_name and flow_name):
raise ValueError("Either a flow_id or a project_name+flow_name must be provided")
# then add if/else to handle proper graphql mutation
Varun Joshi
02/21/2021, 2:30 PMschedule = IntervalSchedule(start_date=datetime.datetime.now() + datetime.timedelta(seconds=1), interval=datetime.timedelta(minutes=1),)
as described on the website. Then why don't I see the flow running every minute.
2. If I make any changes to my flow code, will just running it again reflect changes in the UI?Anthony LE LUYER
02/22/2021, 12:35 PMIgor Dykhta
02/22/2021, 2:06 PMIgor Dykhta
02/22/2021, 2:40 PMAccess denied (403) Current session has been terminated. For further information, do not hesitate to contact us.
Andy
02/22/2021, 2:48 PMflow = load('{project_name}.{flow})
and flow.run()
which would then show up in the Prefect UI?Robert Bastian
02/22/2021, 3:07 PMvish
02/22/2021, 3:17 PMPandasSerializer
and GCSResults
to convert the output of a task from a pandas DataFrame to a partitioned Parquet dataset, before uploading to GCS. It worked well when serializing to a single parquet file (without partitions).
However, when using partitions, the Serializer fails. From my initial look into the source code, it revealed that the Results class expects a binary stream from the serializer. However, _`pandas.to_parquet(*, partition_cols=["col1"])`_ does not return a binary output as it results in multiple parquet files being created.
Example code
pandas_serializer = PandasSerializer(
file_type="parquet",
serialize_kwargs=dict(
engine="pyarrow", partition_cols=["date"]
),
)
gcs_result = GCSResult(
bucket="bucket-name", serializer=pandas_serializer
)
class Extract(Task):
def __init__(self, **kwargs):
super().__init__(**kwargs, result=gcs_result, target="target_file_location")
def run(self, df: pd.DataFrame) -> pd.DataFrame:
# Some processing steps
return df
From my assessment this looks like this pattern (ie. serializer to multiple files) is not supported right now? If that is the case, what are your thoughts on the "prefect-way" to achieve the above.Thanneermalai Lakshmanan
02/22/2021, 3:39 PMGelinger Media
02/22/2021, 4:14 PMbral
02/22/2021, 4:23 PMitay livni
02/22/2021, 5:13 PMSKIP signal raised: SKIP('Provided value "True" did not match "False"')
.
• In prefect cloud the case
statement continues to run indefinitely even though the flow has terminated.
## Logs
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(True)': Starting task run...
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(True)': Finished task run for task with final state: 'Mapped'
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(True)[0]': Starting task run...
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(True)[0]': Finished task run for task with final state: 'Success'
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(False)': Starting task run...
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(False)': Finished task run for task with final state: 'Mapped'
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(False)[0]': Starting task run...
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | SKIP signal raised: SKIP('Provided value "True" did not match "False"')
[2021-02-22 08:49:04-0600] INFO - prefect.TaskRunner | Task 'case(False)[0]': Finished task run for task with final state: 'Skipped'
Any thoughts on what is going on here. ThanksDave
02/22/2021, 6:51 PMitay livni
02/22/2021, 9:12 PMStartFlowRun
in a task
. Where the flow inside the task only runs once. In the example below I would expect it to run three times.
Using
state = adder.run(parameters=dict(num_lst=num_lst))
# # Get results
param_tsk = adder.get_tasks("add_num")
num_lst = state.result[param_tsk[0]]._result.value
Works as expected. Any thoughts on how to run a Flow in a loop with Cloud? Thanks!Sean Talia
02/22/2021, 9:53 PMregister_task_definition()
method. Does this mean that even if the contents of task definition hasn't actually changed, (e.g. the flow is never re-registered, or it's registered but with an identical task definition), the Agent is still going to register a new task to AWS on each and every flow run?ludwig
02/22/2021, 10:34 PMFina Silva-Santisteban
02/22/2021, 10:52 PMaws ecr get-login-password --region regionname | docker login --username AWS --password-stdin <http://xxxxxx.dkr.ecr.us-east-2.amazonaws.com|xxxxxx.dkr.ecr.us-east-2.amazonaws.com>
. I’ve tried using boto3 to do a docker login right before the register()
call but it seems like the auth token expires as soon as I run register:
denied: Your authorization token has expired. Reauthenticate and try again.
Is it possible to have Prefect do the docker login? Or do I always have to use the AWS CLI?Ajith Kumara Beragala Acharige Lal
02/23/2021, 1:07 AMInterruptedError: denied: access forbidden
error when trying prefect.Docker | Pushing image to the registry..., Can anyone help me - how to /where to specify the registry username passwords in the code ? or anywhere it mattersAjith Kumara Beragala Acharige Lal
02/23/2021, 1:07 AMInterruptedError: denied: access forbidden
error when trying prefect.Docker | Pushing image to the registry..., Can anyone help me - how to /where to specify the registry username passwords in the code ? or anywhere it mattersimport prefect
from prefect import task, Flow
from prefect.environments.storage import Docker
from prefect.run_configs import KubernetesRun
@task(log_stdout=True)
def log_my_stdout(print_message):
print("I will log this ",print_message)
with Flow("KubeTestFlow") as f:
log_my_stdout("Example!")
f.storage = Docker( registry_url="hakko.sekai.dev:5050/sekai-backend/", image_name="k8s-job-flow", image_tag="0.1.0" )
f.run_config = KubernetesRun(labels=["DEV"],job_template_path="/home/ajith/prefect/job_spec.yaml")
out = f.register(project_name="kubeTest")
Successfully tagged hakko.sekai.dev:5050/sekai-backend/k8s-job-flow:0.1.0
[2021-02-23 06:35:32+0530] INFO - prefect.Docker | Pushing image to the registry...
Traceback (most recent call last):
File "kubeTutorial.py", line 20, in <module>
out = f.register(project_name="kubeTest")
File "/usr/local/lib/python3.8/dist-packages/prefect/core/flow.py", line 1668, in register
registered_flow = client.register(
File "/usr/local/lib/python3.8/dist-packages/prefect/client/client.py", line 783, in register
serialized_flow = flow.serialize(build=build) # type: Any
File "/usr/local/lib/python3.8/dist-packages/prefect/core/flow.py", line 1450, in serialize
storage = self.storage.build() # type: Optional[Storage]
File "/usr/local/lib/python3.8/dist-packages/prefect/storage/docker.py", line 303, in build
self._build_image(push=push)
File "/usr/local/lib/python3.8/dist-packages/prefect/storage/docker.py", line 378, in _build_image
self.push_image(full_name, self.image_tag)
File "/usr/local/lib/python3.8/dist-packages/prefect/storage/docker.py", line 586, in push_image
raise InterruptedError(line.get("error"))
InterruptedError: denied: access forbidden
Fina Silva-Santisteban
02/23/2021, 1:10 AMAjith Kumara Beragala Acharige Lal
02/23/2021, 1:16 AMFina Silva-Santisteban
02/23/2021, 1:29 AMrun_task_kwargs
which I use for setting things like cluster name and launchtype, but for that to work I must have set the permissions correctly on aws. I save env variables in my Docker storage using the env_vars
param and then use prefect secrets to make them available. Even though it’s not Kubernetes or gitlab registry setup, I hope this helps!Ajith Kumara Beragala Acharige Lal
02/23/2021, 2:00 AMChris White
02/23/2021, 4:08 AMdocker push
commands from the command line might help you identify where the issue is